Featured image of post 学成在线媒资管理模块

学成在线媒资管理模块

本文是黑马学成在线的媒资管理模块的搭建

本文描述了黑马springcloud项目学成在线的媒资管理模块的搭建

模块需求分析

模块介绍

  • 媒资管理系统是每个在线教育平台所必须具备的,查阅百度百科对它的定义如下:

媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。

  • 每个教学机构都可以在媒资系统管理自己的教学资源,包括:视频、教案等文件。

  • 目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等。

  • 主要的功能如下:

    • 媒资查询:教学机构查询自己所拥有的媒资信息。
    • 文件上传:包括上传图片、上传文档、上传视频。
    • 视频处理:视频上传成功,系统自动对视频进行编码处理。
    • 文件删除:教学机构删除自己上传的媒资文件。
  • 下图是课程编辑与发布的整体流程,通过下图可以看到媒资管理在整体流程的位置:

业务流程

上传图片

  • 教学机构人员在课程信息编辑页面上传课程图片,课程图片统一记录在媒资管理系统。

上传视频

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件。点击“媒资管理”,进入媒资管理列表页面查询本机构上传的媒资文件。
  2. 教育机构用户在"媒资管理"页面中点击 “上传视频” 按钮。点击“上传视频”打开上传页面。
  3. 选择要上传的文件,自动执行文件上传,视频上传成功会自动处理。

处理视频

  • 对需要转码处理的视频系统会自动对其处理,处理后生成视频的URL。
  • 处理视频没有用户界面,完全是后台自动执行。

审核媒资

  • 审核媒资包括程序自动审核和人工审核,程序可以通过鉴黄接口https://www.aliyun.com/product/lvwang?spm=5176.19720258.J_3207526240.51.e93976f4rSq796审核视频,对有异议的视频由人工进行审核。
  • 执行步骤
    1. 运营用户登入运营平台并进入媒资管理页面,查找待审核媒资
    2. 点击列表中媒资名称链接,可预览该媒资,若是视频,则播放视频。
    3. 点击列表中某媒资后的"审核" 按钮,开始审核。
    4. 选择审核结果,输入审核意见,完成媒资的审批过程。

绑定媒资

  • 课程计划创建好后需要绑定媒资文件,比如:如果课程计划绑定了视频文件,进入课程在线学习界面后,点课程计划名称则在线播放视频。
  • 如何将课程计划绑定媒资呢?
    1. 教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频“。
    2. 弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。

数据模型

  • 本模块媒资文件相关的数据表如下:

    • 媒资文件表
    媒资文件表结构
    • 待处理视频表
    待处理视频表
    • 视频处理历史表
    视频处理历史表
  • 媒资文件与课程计划绑定关系表如下:

    课程计划与媒资关系表

搭建模块环境

架构的问题分析

  • 当前要开发的是媒资管理服务,目前为止共三个微服务:内容管理、系统管理、媒资管理,如下图:
模块关系图
  • 后期还会添加更多的微服务,当前这种由前端直接请求微服务的方式存在弊端:

    • 如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,比如下边代码中请求系统管理服务的地址使用的是localhost

    前端直接请求微服务

    • 当系统上线后这里需要改成公网的域名,如果这种地址非常多则非常麻烦。
    • 基于这个问题可以采用网关来解决,如下图:
    • 这样在前端的代码中只需要指定每个接口的相对路径,如下所示:

    前端确定相对路径

    • 在前端代码的一个固定的地方在接口地址前统一加网关的地址,每个请求统一到网关,由网关将请求转发到具体的微服务。
  • 为什么所有的请求先到网关呢?

    • 有了网关就可以对请求进行路由,路由到具体的微服务,减少外界对接微服务的成本,比如:400电话,路由则可以根据请求路径进行路由、根据host地址进行路由等, 当微服务有多个实例时可以通过负载均衡算法进行路由,如下:
    网关到微服务
    • 另外,网关还可以实现权限控制、限流等功能。
  • 项目采用Spring Cloud Gateway作为网关,网关在请求路由时需要知道每个微服务实例的地址,项目使用Nacos作用服务发现中心和配置中心,整体的架构图如下:

网关整体的架构图

  • 流程如下:

    1. 微服务启动,将自己注册到Nacos,Nacos记录了各微服务实例的地址。
    2. 网关从Nacos读取服务列表,包括服务名称、服务地址等。
    3. 请求到达网关,网关将请求路由到具体的微服务。
  • 要使用网关首先搭建Nacos,Nacos有两个作用:

    • 服务发现中心。

      • 微服务将自身注册至Nacos,网关从Nacos获取微服务列表。
    • 配置中心。

      • 微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置。

搭建Nacos

服务发现中心

  • 运行Nacos

    • 使用课程提供的Nacos,一般我们启动课程资料给我们的虚拟机时,Nacos会自动执行,但是Nacos需要Mysql作为数据库,故Nacos必须在Mysql之后运行,单Mysql一般不会随虚拟机启动而启动,所以我们需要在启动Mysql后,在重启以下Nacos,保证不出错。
    • 启动Nacos后,访问Nacos页面:虚拟机ip地址+8848/nacos,使用课程资料中的虚拟机的访问http://192.168.101.65:8848/nacos/即可。
    • 账号:nacos,密码:nacos
  • 在搭建Nacos服务发现中心之前需要搞清楚两个概念:namespace和group

    • namespace:用于区分环境、比如:开发环境、测试环境、生产环境。

    • group:用于区分项目,比如:xuecheng-plus项目、xuecheng2.0项目

  • 首先在nacos配置namespace:

    • 点击左侧菜单“命名空间”进入命名空间管理界面
    • 点击“新建命名空间”,填写命名空间的相关信息,如下图

    新建开发环境命名空间

    • 使用相同的办法再创建“测试环境”、“生产环境"的命名空间。

    • 创建成功,如下图:

    命名空间创建完毕

[!CAUTION]

注意:如果使用dev1010命名空间,在下边的配置中对namespace配置为dev1010。

  • 命名空间创建完毕后,首先完成各服务注册到Naocs,下边将内容管理服务注册到nacos中。

    1. 在xuecheng-plus-parent中添加依赖管理

      1
      2
      3
      4
      5
      6
      7
      
      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-alibaba-dependencies</artifactId>
          <version>${spring-cloud-alibaba.version}</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
      
    2. 在内容管理模块的接口工程、系统管理模块的接口工程中添加如下依赖

      1
      2
      3
      4
      
      <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
      </dependency>
      
    3. 配置nacos的地址

      • 在系统管理的接口工程的配置文件中配置如下信息:
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      
      #微服务配置
      spring:
        application:
          name: system-service
        cloud:
          nacos:
            server-addr: 192.168.101.65:8848
            discovery:
              namespace: dev
              group: xuecheng-plus-project
      
      • 在内容管理的接口工程的配置文件中配置如下信息:
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      
      #微服务配置
      spring:
        application:
          name: content-api
        cloud:
          nacos:
            server-addr: 192.168.101.65:8848
            discovery:
              namespace: dev
              group: xuecheng-plus-project
      
    4. 重启内容管理服务、系统管理服务。

      • 待微服务启动成功,进入Nacos服务查看服务列表

      nacos注册中心

      • 在 “开发环境” 命名空间下有两个服务这说明内容管理微服务和系统管理微服务在Nacos注册成功。

配置中心

配置三要素
  • 搭建完成Nacos服务发现中心,下边搭建Nacos为配置中心,其目的就是通过Nacos去管理项目的所有配置。

  • 先将项目中的配置文件分分类:

    • 每个项目特有的配置

      • 是指该配置只在有些项目中需要配置,或者该配置在每个项目中配置的值不同。

      • 比如:spring.application.name每个项目都需要配置但值不一样,以及有些项目需要连接数据库而有些项目不需要,有些项目需要配置消息队列而有些项目不需要。

    • 项目所公用的配置

      • 是指在若干项目中配置内容相同的配置。比如:redis的配置,很多项目用的同一套redis服务所以配置也一样。

      • 另外还需要知道nacos如何去定位一个具体的配置文件,即:namespace、group、dataid.

  • 定位配置文件

    • 通过namespace、group找到具体的环境和具体的项目。

    • 通过dataid找到具体的配置文件,dataid有三部分组成

      • 比如:content-service-dev.yaml配置文件 由(content-service)-(dev). (yaml)三部分组成

      • content-service:第一部分,它是在application.yaml中配置的应用名,即spring.application.name的值。

      • dev:第二部分,它是环境名,通过spring.profiles.active指定,

      • Yaml: 第三部分,它是配置文件 的后缀,目前nacos支持properties、yaml等格式类型,本项目选择yaml格式类型。

    • 所以,如果我们要配置content-service工程的配置文件:

      • 在开发环境中配置content-service-dev.yaml

      • 在测试环境中配置content-service-test.yaml

      • 在生产环境中配置content-service-prod.yaml

    • 我们启动项目中传入spring.profiles.active的参数决定引用哪个环境的配置文件,例如:传入spring.profiles.active=dev表示使用dev环境的配置文件即content-service-dev.yaml。

配置content-service
  • 下边以开发环境为例对content-service工程的配置文件进行配置,再开发环境命名空间中,添加content-service-dev.yaml配置

content-service-dev.yaml配置

  • 输入data id、group以及配置文件内容。

  • 为什么没在nacos中配置下边的内容 ?

1
2
3
spring:
  application:
    name: content-service
  • 因为刚才说了dataid第一部分就是spring.application.name,再根据${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}作为文件id,来读取配置。

  • 而读取配置文件的顺序如下

    1. bootstrap.yml
    2. nacos中的配置文件
    3. 本地application.yml
  • 所以spring.application.name不在nacos中配置,而是要在工程的本地(bootstrap.yaml)进行配置,然后nacos才知道怎么读取配置文件。

  • 在content-service工程的test/resources 中添加bootstrap.yaml,内容如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
spring:
  application:
    name: content-service
  cloud:
    nacos:
      server-addr: 192.168.101.65:8848
      discovery:
        namespace: dev
        group: xuecheng-plus-project
      config:
        namespace: dev
        group: xuecheng-plus-project
        file-extension: yaml
        refresh-enabled: true

#profiles默认为dev
  profiles:
    active: dev
  • 在内容管理模块的接口工程和service工程配置依赖:
1
2
3
4
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
  • 配置完成,运行content-service工程 的单元测试文件,能否正常测试,跟踪单元测试方法可以正常读取数据库的数据,说明从nacos读取配置信息正常。

  • 通过运行观察控制台打印出下边的信息,NacosRestTemplate.java通过Post方式与nacos服务端交互读取配置信息。

1
[NacosRestTemplate.java:476] - HTTP method: POST, url: http://192.168.101.65:8848/nacos/v1/cs/configs/listener, body: {Listening-Configs=content-service.yamlxuecheng-plus-projectdevcontent-service-dev.yamlxuecheng-plus-project88459b1483b8381eccc2ef462bd59182devcontent-servicexuecheng-plus-projectdev, tenant=dev}
配置content-api
  • 以相同的方法配置content-api工程的配置文件,在nacos中的开发环境中配置content-api-dev.yaml,内容如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
server:
  servlet:
    context-path: /content
  port: 63040

# 日志文件配置路径
logging:
  config: classpath:log4j2-dev.xml

# swagger 文档配置
swagger:
  title: "学成在线内容管理系统"
  description: "内容系统管理系统对课程相关信息进行业务管理数据"
  base-package: com.xuecheng.content
  enabled: true
  version: 1.0.0
  • 在content-api工程的本地配置bootstrap.yaml,内容如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#微服务配置
spring:
  application:
    name: content-api
  cloud:
    nacos:
      server-addr: 192.168.101.65:8848
      discovery:
        namespace: dev
        group: xuecheng-plus-project
      config:
        namespace: dev
        group: xuecheng-plus-project
        file-extension: yaml
        refresh-enabled: true
        extension-configs:
          - data-id: content-service-${spring.profiles.active}.yaml
            group: xuecheng-plus-project
            refresh: true
  profiles:
    active: dev
  • 注意:因为api接口工程依赖了service工程的jar,所以这里使用extension-configs扩展配置文件的方式引用service工程所用到的配置文件。
1
2
3
4
        extension-configs:
          - data-id: content-service-${spring.profiles.active}.yaml
            group: xuecheng-plus-project
            refresh: true
  • 如果添加多个扩展文件,继续在下添加即可,如下:
1
2
3
4
5
6
7
        extension-configs:
          - data-id: content-service-${spring.profiles.active}.yaml
            group: xuecheng-plus-project
            refresh: true
          - data-id: 填写文件 dataid
            group: xuecheng-plus-project
            refresh: true           
  • 启动content-api工程,查询控制台是否打印出了请求nacos的日志,如下:
1
[NacosRestTemplate.java:476] - HTTP method: POST, url: http://192.168.101.65:8848/nacos/v1/cs/configs/listener
  • 使用Httpclient测试课程查询接口是否可以正常查询。

公用配置

  • 还有一个优化的点是如何在nacos中配置项目的公用配置呢?

  • nacos提供了shared-configs可以引入公用配置。

  • 在content-api中配置了swagger,所有的接口工程 都需要配置swagger,这里就可以将swagger的配置定义为一个公用配置,哪个项目用引入即可。

  • 单独在xuecheng-plus-common分组下创建xuecheng-plus的公用配置,进入nacos的开发环境,添加swagger-dev.yaml公用配置

配置swagger公用配置

  • 删除接口工程中对swagger的配置。
  • 项目使用shared-configs可以引入公用配置。在接口工程的本地配置文件 中引入公用配置,如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spring:
  application:
    name: content-api
  cloud:
    nacos:
      server-addr: 192.168.101.65:8848
      discovery:
        namespace: dev
        group: xuecheng-plus-project
      config:
        namespace: dev
        group: xuecheng-plus-project
        file-extension: yaml
        refresh-enabled: true
        extension-configs:
          - data-id: content-service-${spring.profiles.active}.yaml
            group: xuecheng-plus-project
            refresh: true
        shared-configs:
          - data-id: swagger-${spring.profiles.active}.yaml
            group: xuecheng-plus-common
            refresh: true
          - data-id: logging-${spring.profiles.active}.yaml
            group: xuecheng-plus-common
            refresh: true
  profiles:
    active: dev
  • 再以相同的方法配置日志的公用配置。

日志公用配置

  • 在接口工程和业务工程,引入loggin-dev.yaml公用配置文件
1
2
3
 - data-id: logging-${spring.profiles.active}.yaml
            group: xuecheng-plus-common
            refresh: true

配置完成,重启content-api接口工程,访问]http://localhost:63040/content/swagger-ui.html 查看swagger接口文档是否可以正常访问,查看控制台log4j2日志输出是否正常。

配置优先级

  • 到目前为止已将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的配置文件 bootstrap.yaml和nacos上的配置文件,SpringBoot读取配置文件的顺序如下:

SpringBoot读取配置文件顺序

  1. 加在bootstarp.yaml,获取nacos地址、配置文件dataidid
  2. 根据dataid读取nacos中的配置
  3. 加在本地配置(例如application.yaml/application-dev.yaml),与nacos拉取的配置合并
  • 引入配置文件的形式有:

    • 以项目应用名方式引入(bootstrap)
    • 以扩展配置文件方式引入
    • 以共享配置文件方式引入
    • 本地配置文件
  • 各配置文件的优先级:项目应用名配置文件 > 扩展配置文件 > 共享配置文件 > 本地配置文件。

  • 有时候我们在测试程序时直接在本地加一个配置进行测试,比如下边的例子:

    • 我们想启动两个内容管理微服务,此时需要在本地指定不同的端口,通过VM Options参数,在IDEA配置启动参数
    • 通过-D指定参数名和参数值,参数名即在bootstrap.yml中配置的server.port。

    通过-D进行本地配置

    • 启动ContentApplication2,发现端口仍然是63040,这说明本地的配置没有生效。
    • 这时我们想让本地最优先,可以在nacos配置文件 中配置如下即可实现:
    1
    2
    3
    4
    5
    
    #配置本地优先
    spring:
     cloud:
      config:
        override-none: true
    
    • 再次启动ContentApplication2,端口为63041。

导入配置文件

  • 课程资料中提供了系统用的所有配置文件nacos_config_export.zip,下边将nacos_config_export.zip导入nacos。
  • 进入具体的命名空间,点击“导入配置”

nacos导入配置1

  • 打开导入窗口
nacos导入配置2
  • 相同的配置选择跳过配置(上图选错了)。
  • 点击“上传文件”选择资料中的nacos_config_export.zip开始导入。

配置系统管理服务

配置system-api
  • bootstrap中的内容

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    #微服务配置
    spring:
      application:
        name: system-api
      cloud:
        nacos:
          server-addr: 192.168.101.65:8848
          discovery:  # 注册中心
            namespace: dev
            group: xuecheng-plus-project
          config:   # 配置中心
            namespace: dev
            group: xuecheng-plus-project
            file-extension: yaml
            refresh-enabled: true
            extension-configs:
              - data-id: system-service-${spring.profiles.active}.yaml
                group: xuecheng-plus-project
                refresh: true
            shared-configs:
              - data-id: swagger-${spring.profiles.active}.yaml
                group: xuecheng-plus-common
                refresh: true
              - data-id: logging-${spring.profiles.active}.yaml
                group: xuecheng-plus-common
                refresh: true
      profiles:
        active: dev
    
  • nacos中的内容已经导入不要配置,即dataid为:system-api-dev.yaml的配置

配置system-service
  • service项目中没有用到配置无需添加
  • nacos中的内容已经导入不要配置,即dataid为:system-service-dev.yaml的配置

搭建Gateway

  • 本项目使用Spring Cloud Gateway作为网关,下边创建网关工程。

  • 新建一个网关工程。

创建网关工程
  • 工程结构
网关工程结构
  • 添加依赖:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<parent>
    <groupId>com.xuecheng</groupId>
    <artifactId>xuecheng-plus-parent</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <relativePath>../xuecheng-plus-parent</relativePath>
</parent>
<artifactId>xuecheng-plus-gateway</artifactId>

<dependencies>

    <!--网关-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>

    <!--服务发现中心-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!-- 排除 Spring Boot 依赖的日志包冲突 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- Spring Boot 集成 log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>

</dependencies>
  • 配置网关的bootstrap.yaml配置文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#微服务配置
spring:
  application:
    name: gateway
  cloud:
    nacos:
      server-addr: 192.168.101.65:8848
      discovery:
        namespace: dev
        group: xuecheng-plus-project
      config:
        namespace: dev
        group: xuecheng-plus-project
        file-extension: yaml
        refresh-enabled: true
        shared-configs:
          - data-id: logging-${spring.profiles.active}.yaml
            group: xuecheng-plus-common
            refresh: true


  profiles:
    active: dev
  • 在nacos上配置网关策略
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
server:
  port: 63010 # 网关端口
spring:
  cloud:
    gateway:
#      filter:
#        strip-prefix:
#          enabled: true
      routes: # 网关路由配置
        - id: content-api # 路由id,自定义,只要唯一即可
          # uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
          uri: lb://content-api # 路由的目标地址 lb就是负载均衡,后面跟服务名称
          predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
            - Path=/content/** # 这个是按照路径匹配,只要以/content/开头就符合要求
#          filters:
#            - StripPrefix=1
        - id: system-api
          # uri: http://127.0.0.1:8081
          uri: lb://system-api
          predicates:
            - Path=/system/**
#          filters:
#            - StripPrefix=1
        - id: media-api
          # uri: http://127.0.0.1:8081
          uri: lb://media-api
          predicates:
            - Path=/media/**
#          filters:
#            - StripPrefix=1
  • 启动网关工程,通过网关工程访问微服务进行测试。
  • 在http-client-env.json中配置网关的地址
http-client-env.json中配置网关的地址
  • 使用httpclient测试课程查询接口,如下:
1
2
3
4
5
6
7
8
### 课程查询列表
POST {{gateway_host}}/content/course/list?pageNo=2&pageSize=1
Content-Type: application/json

{
  "auditStatus": "202002",
  "courseName": ""
}
  • 运行,观察是否可以正常访问接口
  • 网关工程搭建完成即可将前端工程中的接口地址改为网关的地址
前端配置网关地址

搭建媒资工程

  • 至此网关、Nacos已经搭建完成,下边将媒资工程导入项目。

  • 从黑马提供的课程资料中获取媒资工程 xuecheng-plus-media,拷贝到项目工程根目录。

  • 下边做如下配置:

    1. 创建媒资数据库xc_media,并导入资料目录中的xcplus_media.sql
    2. 修改nacos上的media-service-dev.yaml配置文件中的数据库链接信息
    3. 重启media-api工程只要能正常启动成功即可,稍后根据需求写接口。

分布式文件系统

什么是分布式文件系统

  • 要理解分布式文件系统首先了解什么是文件系统。
  • 查阅维基百科:

计算机文件系统是一种存储和组织计算机数据的方法,它使得对其访问和查找变得容易,文件系统使用文件树形目录的抽象逻辑概念代替了硬盘和光盘等物理设备使用数据块的概念,用户使用文件系统来保存数据不必关心数据实际保存在硬盘(或者光盘)的地址为多少的数据块上,只需要记住这个文件的所属目录和文件名。在写入新数据之前,用户不必关心硬盘上的那个块地址没有被使用,硬盘上的存储空间管理(分配和释放)功能由文件系统自动完成,用户只需要记住数据被写入到了哪个文件中。

  • 文件系统是负责管理和存储文件的系统软件,操作系统通过文件系统提供的接口去存取文件,用户通过操作系统访问磁盘上的文件。
  • 下图指示了文件系统所处的位置:
文件系统的位置
  • 常见的文件系统:FAT16/FAT32、NTFS、HFS、UFS、APFS、XFS、Ext4等 。

  • 现在有个问题,一此短视频平台拥有大量的视频、图片,这些视频文件、图片文件该如何存储呢?如何存储可以满足互联网上海量用户的浏览。

  • 分布式文件系统就是海量用户查阅海量文件的方案。

  • 我们阅读维基百科去理解分布式文件系统的定义:

相对于本机端的文件系统而言,分布式文件系统(英语:Distributed file system, DFS),或是网络文件系统(英语:Network File System),是一种允许文件透过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。

  • 通过概念可以简单理解为:一个计算机无法存储海量的文件,通过网络将若干计算机组织起来共同去存储海量的文件,去接收海量用户的请求,这些组织起来的计算机通过网络进行通信,如下图:
分布式文件系统结构图
  • 好处:

    1. 一台计算机的文件系统处理能力扩充到多台计算机同时处理。
    2. 一台计算机挂了还有另外副本计算机提供数据。
    3. 每台计算机可以放在不同的地域,这样用户就可以就近访问,提高访问速度。
  • 市面上有哪些分布式文件系统的产品呢?

    1. NFS

    网络文件系统(英语:Network File System,缩写作 NFS)是一种分布式文件系统,力求客户端主机可以访问服务器端文件,并且其过程与访问本地存储时一样,它由Sun微系统(已被甲骨文公司收购)开发,于1984年发布[1]

    它基于开放网络运算远程过程调用(ONC RPC)系统:一个开放、标准的RFC系统,任何人或组织都可以依据标准实现它。

    • 特点:
      1. 在客户端上映射NFS服务器的驱动器。
      2. 客户端通过网络访问NFS服务器的硬盘完全透明。
    1. GFS

    Google文件系统(英语:Google File System,缩写为GFS或GoogleFS),一种专有分布式文件系统,由Google公司开发,运行于Linux平台上[1]。尽管Google在2003年公布了该系统的一些技术细节,但Google并没有将该系统的软件部分作为开源软件发布[2]

    • 特点:
      1. GFS采用主从结构,一个GFS集群由一个master和大量的chunkserver组成。
      2. master存储了数据文件的元数据,一个文件被分成了若干块存储在多个chunkserver中。
      3. 用户从master中获取数据元信息,向chunkserver存储数据。
    1. HDFS

    HDFS,是Hadoop Distributed File System的简称,是Hadoop抽象文件系统的一种实现。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。 HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。

    • 特点
      1. HDFS采用主从结构,一个HDFS集群由一个名称结点和若干数据结点组成。
      2. 名称结点存储数据的元信息,一个完整的数据文件分成若干块存储在数据结点。
      3. 客户端从名称结点获取数据的元信息及数据分块的信息,得到信息客户端即可从数据块来存取数据。
    1. 云计算厂家

      • 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。其数据设计持久性不低于 99.9999999999%(12 个 9),服务设计可用性(或业务连续性)不低于 99.995%。

      官方网站:https://www.aliyun.com/product/oss

      • 百度对象存储BOS提供稳定、安全、高效、高可扩展的云存储服务。您可以将任意数量和形式的非结构化数据存入BOS,并对数据进行管理和处理。BOS支持标准、低频、冷和归档存储等多种存储类型,满足多场景的存储需求。

      官方网站:https://cloud.baidu.com/product/bos.html

MinIo

介绍

  • 本项目采用MinIO构建分布式文件系统,MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。

  • 它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容 Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。

  • MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。

  • 去中心化有什么好处?

    • 在大数据领域,通常的设计理念都是无中心和分布式。Minio分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置。

    • 它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。如下图:

    minio结构图
    • Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。

    • 使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。 比如上边集合中有4个以内的硬盘损害仍可保证数据恢复,不影响上传和下载,如果多于一半的硬盘坏了则无法恢复。

测试Docker环境

  • 开发阶段和生产阶段统一使用Docker下的MINIO。

  • 在下发的虚拟机中已安装了MinIO的镜像和容器,执行docker start minio(或者sh /data/soft /restart.sh)启动Docker下的MinIO

  • 启动完成登录MinIO查看是否正常。

  • 访问http://192.168.101.65:9000

minio界面
  • 本项目创建两个buckets(点击 create bucket 即可):

    • mediafiles: 普通文件

    • video:视频文件

    [!CAUTION]

    注意:

    • 为bucket取名时,不能用大写字母
    • 创建完bucket后,点击manage,进去修改访问权限为public

SDK

上传文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.4.3</version>
</dependency>
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.8.1</version>
</dependency>
  • 在media-service工程添加此依赖。

  • 参数说明:

  • 需要三个参数才能连接到minio服务。

    参数 说明
    Endpoint 对象存储服务的URL
    Access Key Access key就像用户ID,可以唯一标识你的账户。
    Secret Key Secret key是你账户的密码。
  • 官方的示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import io.minio.BucketExistsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.UploadObjectArgs;
import io.minio.errors.MinioException;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
public class FileUploader {
  public static void main(String[] args)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
    try {
      // Create a minioClient with the MinIO server playground, its access key and secret key.
      // 创建MinIO客户端,连接参数就是上述表格中的三个参数,192.168.101.65:9000、minioadmin、minioadmin
      MinioClient minioClient =
          MinioClient.builder()
              .endpoint("https://play.min.io")
              .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")
              .build();
      // Make 'asiatrip' bucket if not exist.
      // 该步骤为创建bucket,但是我们已经在minio的可视化界面创建完毕,故无需再创建 
      boolean found =
          minioClient.bucketExists(BucketExistsArgs.builder().bucket("asiatrip").build());
      if (!found) {
        // Make a new bucket called 'asiatrip'.
        minioClient.makeBucket(MakeBucketArgs.builder().bucket("asiatrip").build());
      } else {
        System.out.println("Bucket 'asiatrip' already exists.");
      }
      // Upload '/home/user/Photos/asiaphotos.zip' as object name 'asiaphotos-2015.zip' to bucket
      // 'asiatrip'.
      // 将 '/home/user/Photos/asiaphotos.zip' 文件命名为 'asiaphotos-2015.zip'
      // 并上传到 'asiatrip' 里(示例代码创建的bucket)  
      minioClient.uploadObject(
          UploadObjectArgs.builder()
              .bucket("asiatrip")
              .object("asiaphotos-2015.zip")
              .filename("/home/user/Photos/asiaphotos.zip")
              .build());
      // 这段输出也没有用,可以直接删掉  
      System.out.println(
          "'/home/user/Photos/asiaphotos.zip' is successfully uploaded as "
              + "object 'asiaphotos-2015.zip' to bucket 'asiatrip'.");
    } catch (MinioException e) {
      System.out.println("Error occurred: " + e);
      System.out.println("HTTP trace: " + e.httpTrace());
    }
  }
}
  • 参考示例在media-service工程中测试上传文件功能,
  • 首先创建一个用于测试的bucket(testbucket),修改访问权限为public
  • 在xuecheng-plus-media-service工程 的test下编写测试代码如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.xuecheng.media;

import io.minio.BucketExistsArgs;
import io.minio.MakeBucketArgs;
import io.minio.MinioClient;
import io.minio.UploadObjectArgs;
import io.minio.errors.MinioException;

import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

/**
 * @description 测试MinIO
 * @version 1.0
 */
public class MinioTest {

    static MinioClient minioClient =
            MinioClient.builder()
                    .endpoint("http://192.168.101.65:9000")
                    .credentials("minioadmin", "minioadmin")
                    .build();

   //上传文件
    @Test
    public  void upload() {
        try {
            UploadObjectArgs testbucket = UploadObjectArgs.builder()
                    .bucket("testbucket")
//                    .object("test001.mp4")
                    .object("001/test001.mp4")//添加子目录
                    .filename("D:\\develop\\upload\\1mp4.temp")
                    .contentType("video/mp4")//默认根据扩展名确定文件内容类型,也可以指定
                    .build();
            minioClient.uploadObject(testbucket);
            System.out.println("上传成功");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("上传失败");
        }

    }

}
  • 执行upload方法,分别测试向桶的根目录上传文件以及子目录上传文件。

  • 上传成功,通过web控制台查看文件,并预览文件。

  • 说明:

    • 设置contentType可以通过com.j256.simplemagic.ContentType枚举类查看常用的mimeType(媒体类型)

    • 通过扩展名得到mimeType,代码如下:

    1
    2
    3
    4
    5
    6
    7
    
        //根据扩展名取出mimeType
        ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4");
        String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流
    //通过扩展名来获得mimeType的类型
     if(extensionMatch!=null){
                mimeType = extensionMatch.getMimeType();
            }
    
    • 完善上边的代码,如下:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    @Test
        public  void upload() {
            //根据扩展名取出mimeType
            ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4");
            String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流
            //通过扩展名来获得mimeType的类型
            if(extensionMatch!=null){
                mimeType = extensionMatch.getMimeType();
            }
            try {
                UploadObjectArgs testbucket = UploadObjectArgs.builder()
                        .bucket("testbucket")
    //                    .object("test001.mp4")
                        .object("001/test001.mp4")//添加子目录
                        .filename("D:\\develop\\upload\\1mp4.temp")
                        .contentType(mimeType)//默认根据扩展名确定文件内容类型,也可以指定
                        .build();
                minioClient.uploadObject(testbucket);
                System.out.println("上传成功");
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("上传失败");
            }
    
        }
    
删除文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Test
public void delete(){
    try {
        minioClient.removeObject(
               RemoveObjectArgs.builder().bucket("testbucket").object("001/test001.mp4").build());
        System.out.println("删除成功");
    } catch (Exception e) {
       e.printStackTrace();
        System.out.println("删除失败");
    }
}
查询文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//查询文件
@Test
public void getFile() {
    GetObjectArgs getObjectArgs = GetObjectArgs.builder().bucket("testbucket").object("test001.mp4").build();
    try(
        FilterInputStream inputStream = minioClient.getObject(getObjectArgs);
        FileOutputStream outputStream = new FileOutputStream(new File("D:\\develop\\upload\\1_2.mp4"));
     ) {
        IOUtils.copy(inputStream,outputStream);
     } catch (Exception e) {
        e.printStackTrace();
     }
}
  • 校验文件的完整性,对文件计算出md5值,比较原始文件的md5和目标文件的md5,一致则说明完整
1
2
3
4
5
6
7
8
//校验文件的完整性对文件的内容进行md5
FileInputStream fileInputStream1 = new FileInputStream(new File("D:\\develop\\upload\\1.mp4"));
String source_md5 = DigestUtils.md5Hex(fileInputStream1);
FileInputStream fileInputStream = new FileInputStream(new File("D:\\develop\\upload\\1a.mp4"));
String local_md5 = DigestUtils.md5Hex(fileInputStream);
if(source_md5.equals(local_md5)){
    System.out.println("下载成功");
}

上传图片

需求分析

业务流程

  • 课程图片是宣传课程非常重要的信息,在新增课程界面上传课程图片,也可以修改课程图片。

  • 上传课程图片总体上包括两部分:

    • 上传课程图片前端请求媒资管理服务将文件上传至分布式文件系统,并且在媒资管理数据库保存文件信息。
    • 上传图片成功保存图片地址到课程基本信息表中。
  • 详细流程如下:

    1. 前端进入上传图片界面
    2. 上传图片,请求媒资管理服务。
    3. 媒资管理服务将图片文件存储在MinIO。
    4. 媒资管理记录文件信息到数据库。
    5. 前端请求内容管理服务保存课程信息,在内容管理数据库保存图片地址。

数据模型

  • 涉及到的数据表有:课程信息表中的图片字段、媒资数据库的文件表,下边主要看媒资数据库的文件表:
媒资文件表结构

准备环境

  • 在nacos配置中minio的相关信息,进入media-service-dev.yaml:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.101.65:3306/xcplus_media?serverTimezone=UTC&userUnicode=true&useSSL=false&
    username: root
    password: mysql
  cloud:
   config:
    override-none: true

minio:
  endpoint: http://192.168.101.65:9000
  accessKey: minioadmin
  secretKey: minioadmin
  bucket:
    files: mediafiles
    videofiles: video
  • 在media-service工程编写minio的配置类:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.xuecheng.media.config;

import io.minio.MinioClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @description minio配置
 * @version 1.0
 */
 @Configuration
public class MinioConfig {


 @Value("${minio.endpoint}")
 private String endpoint;
 @Value("${minio.accessKey}")
 private String accessKey;
 @Value("${minio.secretKey}")
 private String secretKey;

 @Bean
 public MinioClient minioClient() {

  MinioClient minioClient =
          MinioClient.builder()
                  .endpoint(endpoint)
                  .credentials(accessKey, secretKey)
                  .build();
  return minioClient;
 }
}

[!CAUTION]

注意:若要使用**@ConfigurationPropertites**,要再类中添加Getter和Setter方法,该注解就是通过这两个方法进行配置的注入的,并且由于该类不在启动类的包下,故要在启动类上加上**@EnableConfigurationProperties(MinioConfig.class)**注解

接口定义

  • 根据需求分析,下边进行接口定义,此接口定义为一个通用的上传文件接口,可以上传图片或其它文件。

  • 首先分析接口:

  • 请求地址:/media/upload/coursefile

  • 请求内容:Content-Type: multipart/form-data;form-data; name=“filedata”; filename=“具体的文件名称”

  • 响应参数:文件信息,如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
{
  "id": "a16da7a132559daf9e1193166b3e7f52",
  "companyId": 1232141425,
  "companyName": null,
  "filename": "1.jpg",
  "fileType": "001001",
  "tags": "",
  "bucket": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
  "fileId": "a16da7a132559daf9e1193166b3e7f52",
  "url": "/testbucket/2022/09/12/a16da7a132559daf9e1193166b3e7f52.jpg",
  "timelength": null,
  "username": null,
  "createDate": "2022-09-12T21:57:18",
  "changeDate": null,
  "status": "1",
  "remark": "",
  "auditStatus": null,
  "auditMind": null,
  "fileSize": 248329
}
  • 定义上传响应模型类:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package com.xuecheng.media.model.dto;

import com.xuecheng.media.model.po.MediaFiles;
import lombok.Data;
import lombok.ToString;

/**
 * @description 上传普通文件成功响应结果
 * @version 1.0
 */
//以后可能需要再这里面添加参数,故先定义出来
 @Data
public class UploadFileResultDto extends MediaFiles {
  

}
  • 定义接口如下:
1
2
3
4
5
6
@ApiOperation("上传文件")
@RequestMapping(value = "/upload/coursefile", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public UploadFileResultDto upload(@RequestPart("filedata") MultipartFile upload) throws IOException {

    return null;
}
  • 接口定义好后可以用httpclient工具测试一下

  • 使用httpclient测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
### 上传文件
POST {{media_host}}/media/upload/coursefile
Content-Type: multipart/form-data; boundary=WebAppBoundary

--WebAppBoundary
Content-Disposition: form-data; name="filedata";
### 这里用自己的文件名
filename="1.jpg"
Content-Type: application/octet-stream

### 这里用自己想要上传的文件的路径
< d:/develop/upload/1.jpg

接口开发

Dao开发

  • 根据需求分析DAO层实现向media_files表插入一条记录,使用media_files表生成的mapper即可。、

Service开发

  • Service方法需要提供一个更加通用的保存文件的方法。

  • 定义请求参数类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.xuecheng.media.model.dto;

import com.xuecheng.media.model.po.MediaFiles;
import lombok.Data;
import lombok.ToString;

/**
 * @description 上传普通文件请求参数
 * @author Mr.M
 * @date 2022/9/12 18:49
 * @version 1.0
 */
 @Data
public class UploadFileParamsDto {

 /**
  * 文件名称
  */
 private String filename;


 /**
  * 文件类型(文档,音频,视频)
  */
 private String fileType;
 /**
  * 文件大小
  */
 private Long fileSize;

 /**
  * 标签
  */
 private String tags;

 /**
  * 上传人
  */
 private String username;

 /**
  * 备注
  */
 private String remark;

}
  • 定义service接口:
1
2
3
4
5
6
7
8
/**
 * 上传文件
 * @param companyId 机构id
 * @param uploadFileParamsDto 上传文件信息
 * @param localFilePath 文件磁盘路径
 * @return 文件信息
 */
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath);
  • 实现方法如下:
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

@Autowired
MinioClient minioClient;

@Autowired
MediaFilesMapper mediaFilesMapper;

//普通文件桶
@Value("${minio.bucket.files}")
private String bucket_Files;


//获取文件默认存储目录路径 年/月/日
private String getDefaultFolderPath() {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    String folder = sdf.format(new Date()).replace("-", "/")+"/";
    return folder;
}

//获取文件的md5
private String getFileMd5(File file) {
    try (FileInputStream fileInputStream = new FileInputStream(file)) {
        String fileMd5 = DigestUtils.md5Hex(fileInputStream);
        return fileMd5;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}


private String getMimeType(String extension){
    if(extension==null)
        extension = "";
    //根据扩展名取出mimeType
    ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
    //通用mimeType,字节流
    String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;
    if(extensionMatch!=null){
        mimeType = extensionMatch.getMimeType();
    }
    return mimeType;
}
/**
 * @description 将文件写入minIO
 * @param localFilePath  文件地址
 * @param bucket  桶
 * @param objectName 对象名称
 * @return void
 */
public boolean addMediaFilesToMinIO(String localFilePath,String mimeType,String bucket, String objectName) {
    try {
        UploadObjectArgs testbucket = UploadObjectArgs.builder()
                .bucket(bucket)
                .object(objectName)
                .filename(localFilePath)
                .contentType(mimeType)
                .build();
        minioClient.uploadObject(testbucket);
        log.debug("上传文件到minio成功,bucket:{},objectName:{}",bucket,objectName);
        System.out.println("上传成功");
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        log.error("上传文件到minio出错,bucket:{},objectName:{},错误原因:{}",bucket,objectName,e.getMessage(),e);
        XueChengPlusException.cast("上传文件到文件系统失败");
    }
    return false;
}

/**
 * @description 将文件信息添加到文件表
 * @param companyId  机构id
 * @param fileMd5  文件md5值
 * @param uploadFileParamsDto  上传文件的信息
 * @param bucket  桶
 * @param objectName 对象名称
 * @return com.xuecheng.media.model.po.MediaFiles
 */
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){
    //从数据库查询文件
    MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
    if (mediaFiles == null) {
        mediaFiles = new MediaFiles();
        //拷贝基本信息
        BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
        mediaFiles.setId(fileMd5);
        mediaFiles.setFileId(fileMd5);
        mediaFiles.setCompanyId(companyId);
        mediaFiles.setUrl("/" + bucket + "/" + objectName);
        mediaFiles.setBucket(bucket);
        mediaFiles.setFilePath(objectName);
        mediaFiles.setCreateDate(LocalDateTime.now());
        mediaFiles.setAuditStatus("002003");
        mediaFiles.setStatus("1");
        //保存文件信息到文件表
        int insert = mediaFilesMapper.insert(mediaFiles);
        if (insert < 0) {
            log.error("保存文件信息到数据库失败,{}",mediaFiles.toString());
            XueChengPlusException.cast("保存文件信息失败");
        }
        log.debug("保存文件信息到数据库成功,{}",mediaFiles.toString());

    }
    return mediaFiles;

}
@Transactional
@Override
public UploadFileResultDto uploadFile(Long companyId, UploadFileParamsDto uploadFileParamsDto, String localFilePath) {
    File file = new File(localFilePath);
    if (!file.exists()) {
        XueChengPlusException.cast("文件不存在");
    }
    //文件名称
    String filename = uploadFileParamsDto.getFilename();
    //文件扩展名
    String extension = filename.substring(filename.lastIndexOf("."));
    //文件mimeType
    String mimeType = getMimeType(extension);
    //文件的md5值
    String fileMd5 = getFileMd5(file);
    //文件的默认目录
    String defaultFolderPath = getDefaultFolderPath();
    //存储到minio中的对象名(带目录)
    String  objectName = defaultFolderPath + fileMd5 + exension;
    //将文件上传到minio
    boolean b = addMediaFilesToMinIO(localFilePath, mimeType, bucket_files, objectName);
    //文件大小
    uploadFileParamsDto.setFileSize(file.length());
    //将文件信息存储到数据库
    MediaFiles mediaFiles = addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_files, objectName);
    //准备返回数据
    UploadFileResultDto uploadFileResultDto = new UploadFileResultDto();
    BeanUtils.copyProperties(mediaFiles, uploadFileResultDto);
    return uploadFileResultDto;

}

完善接口层

  • 完善接口层代码 :
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
     * 上传文件
     * @param filedata
     * @return
     */
    @ApiOperation("上传文件")
    @PostMapping(value = "/upload/coursefile",consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public UploadFileResultDto upload(@RequestPart("filedata")MultipartFile filedata) throws IOException {
        //1.设置文件信息
        UploadFileParamsDto uploadFileParamsDto = new UploadFileParamsDto();
        //1.1文件大小
        uploadFileParamsDto.setFileSize(filedata.getSize());
        //1.2文件名称
        uploadFileParamsDto.setFilename(filedata.getOriginalFilename());
        //1.3文件类型
        uploadFileParamsDto.setFileType("001002");
        //2.创建临时文件,将文件临时存储到本地
        File tempFile = File.createTempFile("minio", "temp");
        filedata.transferTo(tempFile);
        //3.临时存储在本地的该文件路径
        String localFilePath = tempFile.getAbsolutePath();

        //4.TODO 获取companyId
        Long companyId = 1232141425L;
        //5.调用service上传文件
        return mediaFileService.uploadFile(companyId,uploadFileParamsDto,localFilePath);
    }

接口测试

  • 首先使用httpclient测试
1
2
3
4
5
6
7
8
9
### 上传文件
POST {{media_host}}/media/upload/coursefile
Content-Type: multipart/form-data; boundary=WebAppBoundary

--WebAppBoundary
Content-Disposition: form-data; name="filedata"; filename="1.jpg"
Content-Type: application/octet-stream

< d:/develop/upload/1.jpg
  • 再进行前后端联调测试
    1. 在新增课程、编辑课程界面上传图,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存课程图片信息。
    2. 上图图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息。

Service事务优化

  • 上边的service方法优化后并测试通过,现在思考关于uploadFile方法的是否应该开启事务。
  • 目前是在uploadFile方法上添加@Transactional,当调用uploadFile方法前会开启数据库事务,如果上传文件过程时间较长那么数据库的事务持续时间就会变长,这样数据库链接释放就慢,最终导致数据库链接不够用。
  • 我们只将addMediaFilesToDb方法添加事务控制即可,uploadFile方法上的@Transactional注解去掉。
  • 优化后如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName){

   //从数据库查询文件
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles == null) {
    mediaFiles = new MediaFiles();
    //拷贝基本信息
    BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
    mediaFiles.setId(fileMd5);
    mediaFiles.setFileId(fileMd5);
    mediaFiles.setCompanyId(companyId);
    mediaFiles.setUrl("/" + bucket + "/" + objectName);
    mediaFiles.setBucket(bucket);
    mediaFiles.setFilePath(objectName);
    mediaFiles.setCreateDate(LocalDateTime.now());
    mediaFiles.setAuditStatus("002003");
    mediaFiles.setStatus("1");
    //保存文件信息到文件表
    int insert = mediaFilesMapper.insert(mediaFiles);
    if (insert < 0) {
        log.error("保存文件信息到数据库失败,{}",mediaFiles.toString());
        XueChengPlusException.cast("保存文件信息失败");
    }
    log.debug("保存文件信息到数据库成功,{}",mediaFiles.toString());

}
return mediaFiles;

}
  • 我们人为在int insert = mediaFilesMapper.insert(mediaFiles);下边添加一个异常代码int a=1/0;

  • 测试是否事务控制。很遗憾,事务控制失败。

  • 方法上已经添加了@Transactional注解为什么该方法不能被事务控制呢?

  • 如果是在uploadFile方法上添加@Transactional注解就可以控制事务,去掉则不行。

  • 现在的问题其实是一个非事务方法调同类一个事务方法,事务无法控制,这是为什么?

  • 下边分析原因:

    • 如果在uploadFile方法上添加@Transactional注解,代理对象执行此方法前会开启事务,如下图:
    主方法上添加事物
    • 如果在uploadFile方法上没有@Transactional注解,代理对象执行此方法前不进行事务控制,如下图:
    被调用的方法上添加事物
  • 所以判断该方法是否可以事务控制必须保证

    1. 是通过代理对象调用此方法,
    2. 且此方法上添加了@Transactional注解。
  • 现在在addMediaFilesToDb方法上添加@Transactional注解,也不会进行事务控制是因为并不是通过代理对象执行的addMediaFilesToDb方法。

  • 为了判断在uploadFile方法中去调用addMediaFilesToDb方法是否是通过代理对象去调用,我们可以打断点跟踪。

跟踪,判断是否时代理对象调用了方法

  • 我们发现在uploadFile方法中去调用addMediaFilesToDb方法不是通过代理对象去调用。

  • 如何解决呢?通过代理对象去调用addMediaFilesToDb方法即可解决。

  • MediaFileService的实现类中注入MediaFileService的代理对象,如下:

    1
    2
    
    @Autowired
    MediaFileService currentProxy;
    
  • 将addMediaFilesToDb方法提成接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
 * @description 将文件信息添加到文件表
 * @param companyId  机构id
 * @param fileMd5  文件md5值
 * @param uploadFileParamsDto  上传文件的信息
 * @param bucket  桶
 * @param objectName 对象名称
 * @return com.xuecheng.media.model.po.MediaFiles
 */

public MediaFiles addMediaFilesToDb(Long companyId,String fileMd5,UploadFileParamsDto uploadFileParamsDto,String bucket,String objectName);
  • 通过代理对象调用addMediaFilesToDb:
1
2
3
4
.....
//写入文件表
MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_files, objectName);
 ....
  • 再次测试事务是否可以正常控制。

上传视频

需求分析

业务流程

  1. 教学机构人员进入媒资管理列表查询自己上传的媒资文件。
  2. 教育机构用户在"媒资管理"页面中点击 “上传视频” 按钮。
  3. 选择要上传的文件,自动执行文件上传。
  4. 视频上传成功会自动处理,处理完成可以预览视频。

断点续传技术

什么是断点续传

  • 通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。

  • 什么是断点续传,引用百度百科:

    断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。

  • 断点续传流程如下图:

断点续传流程

  • 流程如下:
    1. 前端上传前先把文件分成块
    2. 一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
    3. 各分块上传完成最后在服务端合并文件

分块与合并测试

  • 为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。

  • 文件分块的流程如下:

    1. 获取源文件长度
    2. 根据设定的分块文件的大小计算出块数
    3. 从源文件读数据依次向每一个块文件写数据。
  • 测试代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//分块测试
    @Test
    public void testChunk() throws Exception {
        //源文件
        File sourceFile = new File("D:\\MyFile\\BUCT\\大一\\小学期\\英语语音\\英语语音模仿材料" +
                "\\I believe I can fly\\R. Kelly - I Believe I Can Fly_高清.mp4");
        //设置分块文件的存储路径
        String chunkFilePath = "D:\\MyFile\\BUCT\\chunk\\";
        File chunkFolder = new File(chunkFilePath);
        if (!chunkFolder.exists()) {
            chunkFolder.mkdirs();
        }
        //设置分块文件的大小
        int chunkSize = 1024*1024*5;
        //分块文件的数量
        int chunkNum = (int) Math.ceil(sourceFile.length() * 1.0 / chunkSize);
        //使用流从文件中读取数据,向文件中写数据
        //读流
        RandomAccessFile rafRead = new RandomAccessFile(sourceFile,"r");
        //缓存区
        byte [] bytes =new byte[1024];
        //循环分块文件的数量的次数
        for (int i = 0; i < chunkNum; i++) {
            //设置分块文件
            File chunkFile =new File(chunkFilePath + i);
            if(chunkFile.exists()){
                chunkFile.delete();
            }
            boolean newFile = chunkFile.createNewFile();
            if(newFile){
                int length = -1;
                RandomAccessFile rafReadAndWrite = new RandomAccessFile(chunkFile,"rw");
                //读取文件,读取结束时,length会被设置为-1
                while((length= rafRead.read(bytes))!=-1){
                    //往分块中写数据
                    rafReadAndWrite.write(bytes,0,length);
                    if(chunkFile.length() >= chunkSize){
                        break;
                    }
                }
                rafReadAndWrite.close();
            }
        }
        rafRead.close();
    }
  • 文件合并流程:
    1. 找到要合并的文件并按文件合并的先后进行排序。
    2. 创建合并文件
    3. 依次从合并的文件中读取数据向合并文件写入数
  • 文件合并的测试代码 :
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//将分块合并
    @Test
    public void testMerge() throws IOException {
        //分块文件路径
        File chunkFile =new File("D:\\MyFile\\BUCT\\chunk\\");
        //合并文件
        File mergeFile = new File("D:\\MyFile\\BUCT\\R. Kelly - I Believe I CanFly_高清_2.mp4");
        //源文件
        File sourceFile = new File("D:\\MyFile\\BUCT\\大一\\小学期\\英语语音\\英语语音模仿材料" +
                "\\I believe I can fly\\R. Kelly - I Believe I Can Fly_高清.mp4");
        //获取分块文件并排序
        File[] listFiles = chunkFile.listFiles();
        List<File> files = Arrays.asList(listFiles);
        Collections.sort(files, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                // o1-o2升序,o2-o1降序
                return Integer.parseInt(o1.getName()) - Integer.parseInt(o2.getName());
            }
        });
        //读取文件先要存储到缓存区
        byte[] bytes = new byte[1024];
        //创建合并文件
        RandomAccessFile rafWriteChunk = new RandomAccessFile(mergeFile,"rw");
        for (File file : files) {
            //读取分块文件的流
            RandomAccessFile rafRead = new RandomAccessFile(file,"rw");
            int length = -1;
            while((length=rafRead.read(bytes))!=-1){
                rafWriteChunk.write(bytes,0,length);
            }
            rafRead.close();
        }
        rafWriteChunk.close();
        //对合并的文件进行校验
        String md5_1 = DigestUtils.md5Hex(new FileInputStream(mergeFile));
        String md5_2 = DigestUtils.md5Hex(new FileInputStream(sourceFile));
        if (md5_1.equals(md5_2)){
            System.out.println("合并成功");
        }else{
            System.out.println("合并失败");
        }
    }

视频上传流程

  • 下图是上传视频的整体流程:

视频上传流程

  1. 前端对文件进行分块。
  2. 前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。
  3. 如果分块文件不存在则前端开始上传
  4. 前端请求媒资服务上传分块。
  5. 媒资服务将分块上传至MinIO。
  6. 前端将分块上传完毕请求媒资服务合并分块。
  7. 媒资服务判断分块上传完成则请求MinIO合并文件。
  8. 合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。

minio合并文件测试

  1. 将分块文件上传至minio

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    /**
         * 将分块文件上传的minio
         */
        @Test
        public void testUploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
            // 上传文件到minio
            String objectName = "test/chunk/";
            String filePath = "D:\\MyFile\\BUCT\\chunk\\";
            for (int i =0 ;i <3;i++) {
                UploadObjectArgs testbucket = UploadObjectArgs.builder()
                        .bucket("testbucket")  //桶名
                        .object(objectName+i) //文件名 放在子目录下
                        .filename(filePath+i) //文件路径
                        .build();
                minioClient.uploadObject(testbucket);
                System.out.println("上传分片文件成功:"+i);
            }
        }
    
  2. 通过minio的合并文件

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
     /**
         * 用minio提供的接口合并分片文件
         */
        @Test
        public void testMerge() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
            List<ComposeSource> sources =new ArrayList<>();
            for (int i=0; i<3 ;i++) {
                ComposeSource composeSource = ComposeSource.builder()
                        .bucket("testbucket")
                        .object("test/chunk/" + i)
                        .build();
                sources.add(composeSource);
            }
    
            ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
                    .bucket("testbucket")
                    .object("test/merge/merge01.mp4")
                    .sources(sources)
                    .build();
            //报错: size 1048576 must be greater than 5242880
            minioClient.composeObject(composeObjectArgs);
        }
    

[!NOTE]

  • 使用minio合并文件报错:java.lang.IllegalArgumentException: source testbucket/chunk/0: size 1048576 must be greater than 5242880

  • minio合并文件默认分块最小5M,我们将分块改为5M再次测试。

  • 在nacos中的media-service中增加如下配置

1
2
3
4
5
spring:
  servlet:
    multipart:
      max-file-size: 50MB
      max-request-size: 50MB

接口定义

  • 根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}否则返回{code:-1}

  • 从课程资料中拷贝RestResponse.java类到base工程下的model包下。

  • 定义接口如下:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    
    /**
     * @author Mr.M
     * @version 1.0
     */
    @Api(value = "大文件上传接口", tags = "大文件上传接口")
    @RestController
    public class BigFilesController {
    
    
    
        @ApiOperation(value = "文件上传前检查文件")
        @PostMapping("/upload/checkfile")
        public RestResponse<Boolean> checkfile(
                @RequestParam("fileMd5") String fileMd5
        ) throws Exception {
            return null;
        }
    
    
        @ApiOperation(value = "分块文件上传前的检测")
        @PostMapping("/upload/checkchunk")
        public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,
                                                @RequestParam("chunk") int chunk) throws Exception {
           return null;
        }
    
        @ApiOperation(value = "上传分块文件")
        @PostMapping("/upload/uploadchunk")
        public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,
                                        @RequestParam("fileMd5") String fileMd5,
                                        @RequestParam("chunk") int chunk) throws Exception {
    
            return null;
        }
    
        @ApiOperation(value = "合并文件")
        @PostMapping("/upload/mergechunks")
        public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,
                                        @RequestParam("fileName") String fileName,
                                        @RequestParam("chunkTotal") int chunkTotal) throws Exception {
            return null;
    
        }
    
    
    }
    

上传分块开发

DAO开发

向媒资数据库的文件表插入记录,使用自动生成的Mapper接口即可满足要求。

Service开发

检查文件和分块
  • 接口完成进行接口实现,首先实现检查文件方法和检查分块方法,若存在,则无需上传

  • 在MediaFileService中定义service接口如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 /**
    * @author 闲人指路
    * @description 检查文件是否存在
    * @dateTime 18:20 2024/10/9
    * @param fileMd5 文件的md5
    * @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
   */
    public RestResponse<Boolean> checkFile(String fileMd5);

    /**
     * @author 闲人指路
     * @description 检查分块文件是否存在
     * @dateTime 18:22 2024/10/9
     * @param fileMd5 文件的md5
     * @param chunkIndex 分块序号
     * @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
    */
    public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex);
  • service接口实现方法:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 @Override
    public RestResponse<Boolean> checkFile(String fileMd5) {
        //1.先查询数据库
        MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
        if(mediaFiles==null){
            //数据库中不存在,直接返回
            return RestResponse.success(false);
        }
        //2.如果数据库存在,再查询minio
        return checkMinioFile(mediaFiles.getFilePath(), mediaFiles.getBucket());
    }

    @Override
    public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {
        //1.拼接分块的存储路径
        String chunkFileFolderPath = getChunkFileFolderPath(fileMd5)+chunkIndex;
        //2.查询minio
        return checkMinioFile(chunkFileFolderPath, bucket_video);
    }

    /**
     * @author 闲人指路
     * @description 查询文件状态
     * @dateTime 18:43 2024/10/9
     * @param chunkFileFolderPath 文件在minio中的路径
     * @param bucket 文件的存储桶
     * @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
    */
    @NotNull
    private RestResponse<Boolean> checkMinioFile(String chunkFileFolderPath, String bucket) {
        GetObjectArgs getObjectArgs = GetObjectArgs.builder()
                .bucket(bucket)
                .object(chunkFileFolderPath)
                .build();
        try {
            //获取文件信息
            FilterInputStream filterInputStream = minioClient.getObject(getObjectArgs);
            if (filterInputStream != null) {
                //文件已经存在
                return RestResponse.success(true);
            }
        } catch (Exception e) {
            //抛出异常代表文件不存在
            e.printStackTrace();
            return RestResponse.success(false);
        }
        //文件不存在
        return RestResponse.success(false);
    }
/**
     * @author 闲人指路
     * @description 得到分块文件的目录
     * @dateTime 18:35 2024/10/9
     * @return 分块文件的路径
    */
    private String getChunkFileFolderPath(String fileMd5) {
        return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
    }
上传分块
  • 定义service接口
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**
     * @author 闲人指路
     * @description 上传分块文件
     * @dateTime 18:50 2024/10/9
     * @param localFilePath 文件的本地存储路径
     * @param fileMd5 文件的md5
     * @param chunk 分块序号
     * @return com.xuecheng.base.model.RestResponse
    */
    RestResponse uploadChunk(String localFilePath, String fileMd5, int chunk);
  • 接口实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 @Override
    public boolean addMediaFiles2MinIo(
            String localFilePath, String mimeType, String bucket, String objectName
    ){
        try {
            UploadObjectArgs uploadObjectArgs = UploadObjectArgs.builder()
                    //桶名
                    .bucket(bucket)
                    //文件名 放在子目录下
                    .object(objectName)
                    //文件路径
                    .filename(localFilePath)
                    //文件类型
                    .contentType(mimeType)
                    .build();
            minioClient.uploadObject(uploadObjectArgs);
            log.debug("上传文件成功,bucket:{},objectName:{}",bucket,objectName);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("上传文件失败,bucket:{},objectName:{},错误信息:{}",bucket,objectName,e.getMessage());
            return false;
        }
    }
 /**
     * @author 闲人指路
     * @description 得到分块文件的目录
     * @dateTime 18:35 2024/10/9
     * @return 分块文件的路径
    */
    private String getChunkFileFolderPath(String fileMd5) {
        return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
    }

    @Override
    public RestResponse uploadChunk(String localFilePath, String fileMd5, int chunk) {
        //1.获取mimeType
        String mimeType = getMimeType(null);
        //2.获取文件存储在minio的路径
        String chunkFileFolderPath = getChunkFileFolderPath(fileMd5) + chunk;
        //3.上传文件到minio
        boolean isSuccess = addMediaFiles2MinIo(localFilePath, mimeType, bucket_video, chunkFileFolderPath);
        if (isSuccess){
            //上传成功
            return RestResponse.success(true);
        }else {
            //上传失败
            return RestResponse.validfail(false,"上传分块文件失败");
        }
    }
上传分块测试
  • 完善接口层:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
    * @author 闲人指路
    * @description 文件上传前检查文件
    * @dateTime 16:32 2024/10/9
    * @param fileMd5 文件Md5值
    * @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
    */
    @ApiOperation(value = "文件上传前检查文件")
    @PostMapping("/upload/checkfile")
    public RestResponse<Boolean> checkfile(
            @RequestParam("fileMd5") String fileMd5){
        return mediaFileService.checkFile(fileMd5);
    }

    /**
    * @author 闲人指路
    * @description 分块文件上传前的检测
    * @dateTime 16:33 2024/10/9
    * @param fileMd5 文件Md5值
    * @param chunk 该分块文件的序号
    * @return com.xuecheng.base.model.RestResponse<java.lang.Boolean>
    */
    @ApiOperation(value = "分块文件上传前的检测")
    @PostMapping("/upload/checkchunk")
    public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,
                                            @RequestParam("chunk") int chunk){
        return mediaFileService.checkChunk(fileMd5, chunk);
    }

    /**
    * @author 闲人指路
    * @description 上传分块文件
    * @dateTime 16:34 2024/10/9
    * @param file 前端传来的文件
    * @param fileMd5 文件Md5值
    * @param chunk 分块序号
    * @return com.xuecheng.base.model.RestResponse
    */
    @ApiOperation(value = "上传分块文件")
    @PostMapping("/upload/uploadchunk")
    public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,
                                    @RequestParam("fileMd5") String fileMd5,
                                    @RequestParam("chunk") int chunk) throws Exception {
        //1.创建临时文件
        File tempFile = File.createTempFile("minio", "temp");
        //2.将文件转存到本地
        file.transferTo(tempFile);
        //3.获取文件的本地路径
        String localFilePath = tempFile.getAbsolutePath();
        //4.调用service上传分块文件
        return mediaFileService.uploadChunk(localFilePath, fileMd5, chunk);
    }
  • 启动前端工程,进入上传视频界面进行前后端联调测试。

合并分块开发

service开发

  • 定义service接口
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
     * @author 闲人指路
     * @description 合并分块
     * @dateTime 20:25 2024/10/9
     * @param companyId companyId
     * @param fileMd5 文件md5
     * @param chunkTotal 分块总个数
     * @param uploadFileParamsDto 文件信息
     * @return com.xuecheng.base.model.RestResponse
    */
    public RestResponse mergechunks(Long companyId,String fileMd5,int chunkTotal,UploadFileParamsDto uploadFileParamsDto);
  • service实现
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
 @Override
    public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {
        //1.找到分块文件调用minio的SDK进行合并
        List<ComposeSource> sources =new ArrayList<>();
        String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
        for (int i=0; i<chunkTotal ;i++) {

            ComposeSource composeSource = ComposeSource.builder()
                    .bucket(bucket_video)
                    .object(chunkFileFolderPath + i)
                    .build();
            sources.add(composeSource);
        }

        //获取源文件名称
        String filename = uploadFileParamsDto.getFilename();
        //获取扩展名
        String extension = filename.substring(filename.lastIndexOf("."));
        String objectName = getFilePathByMd5(fileMd5, extension);
        ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
                .bucket(bucket_video)
                .object(objectName)
                .sources(sources)
                .build();
        //报错: size 1048576 must be greater than 5242880
        try {
            minioClient.composeObject(composeObjectArgs);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("合并文件出错,bucket:{},objectName:{},错误信息:{}",bucket_video,objectName,e.getMessage());
            return RestResponse.validfail(false,"合并文件出错");
        }
        //2.进行文件检验
        //先下载文件
        File fileNew = downloadFileFromMinIO(bucket_video, objectName);
        //设置文件的大小
        uploadFileParamsDto.setFileSize(fileNew.length());

        try (FileInputStream fis = new FileInputStream(fileNew)){
            //计算其Md5
            String md5HexNew = DigestUtils.md5Hex(fis);
            //比较md5值
            if(!md5HexNew.equals(fileMd5)){
                log.error("校验合并文件md5值不一致,原始文件:{},合并文件:{},错误信息:{}",fileMd5,md5HexNew);
                return RestResponse.validfail(false,"文件校验失败");
            }
        }catch (Exception e){
            log.error("校验文件出错,bucket:{},objectName:{},错误信息:{}",bucket_video,objectName,e.getMessage());
            return RestResponse.validfail(false,"文件校验失败");
        }
        //3.将文件信息入库
        //用代理对像调用,保证事物
        MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(fileMd5, companyId, uploadFileParamsDto, bucket_video, objectName);
        if(mediaFiles == null){
            return RestResponse.validfail(false,"文件入库失败");
        }
        //4.清理分块文件
        clearChunkFiles(chunkFileFolderPath,chunkTotal);
        return RestResponse.success(true);
    }
@Override
    public File downloadFileFromMinIO(String bucket,String objectName){
        //临时文件
        File minioFile = null;
        FileOutputStream outputStream = null;
        try{
            InputStream stream = minioClient.getObject(GetObjectArgs.builder()
                    .bucket(bucket)
                    .object(objectName)
                    .build());
            //创建临时文件
            minioFile=File.createTempFile("minio", ".merge");
            outputStream = new FileOutputStream(minioFile);
            IOUtils.copy(stream,outputStream);
            return minioFile;
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(outputStream!=null){
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return null;
    }
/**
     * @author 闲人指路
     * @description 清除分块文件
     * @dateTime 20:58 2024/10/9
     * @param chunkFileFolderPath 分块文件路径
     * @param chunkTotal 分块文件总数
     * @return void
    */
    private void clearChunkFiles(String chunkFileFolderPath,int chunkTotal) {

        try {
            List<DeleteObject> deleteObjects = Stream.iterate(0, i -> ++i)
                    .limit(chunkTotal)
                    .map(i -> new DeleteObject(chunkFileFolderPath.concat(Integer.toString(i))))
                    .collect(Collectors.toList());

            RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder().bucket("video").objects(deleteObjects).build();
            Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
            results.forEach(r -> {
                DeleteError deleteError = null;
                try {
                    deleteError = r.get();
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("清理分块文件失败,objectname:{}", deleteError.objectName(), e);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("清理分块文件失败,chunkFileFolderPath:{}", chunkFileFolderPath, e);
        }
    }
/**
     * @author 闲人指路
     * @description 得到分块文件的目录
     * @dateTime 18:35 2024/10/9
     * @return 分块文件的路径
    */
    private String getChunkFileFolderPath(String fileMd5) {
        return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
    }

[!CAUTION]

这里为了保证事物的最小性,没有将事物直接写在主方法上,而是写在了被调用的方法上,但是这样就会使事物失效,要使事物有效,就必须用代理对象来调用该事务。

合并分块测试

  • 下边进行前后端联调:

    1. 上传一个视频测试合并分块的执行逻辑,进入service方法逐行跟踪。
    2. 断点续传测试
  • 上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传

视频处理

视频转码需求

什么是视频编码

  • 视频上传成功后需要对视频进行转码处理。

  • 什么是视频编码?查阅百度百科如下:

所谓视频编码方式就是指通过压缩技术,将原始视频格式的文件转换成另一种视频格式文件的方式。视频流传输中最为重要的编解码标准有国际电联H.261、H.263、H.264,运动静止图像专家组的M-JPEG国际标准化组织运动图像专家组的MPEG系列标准,此外在互联网上被广泛应用的还有Real-Networks的RealVideo微软公司的WMV以及Apple公司的QuickTime等。

  • 首先我们要分清文件格式和编码格式:

    • 文件格式:是指.mp4、.avi、.rmvb等,这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。

    • 音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。

  • 音视频编码格式各类繁多,主要有几下几类:

    • MPEG系列(由ISO[国际标准组织机构]下属的MPEG[运动图象专家组]开发 )

      • 视频编码方面主要是Mpeg1(vcd用的就是它)、Mpeg2(DVD使用)、Mpeg4(的DVDRIP使用的都是它的变种,如:divx,xvid等)、Mpeg4 AVC(正热门);音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(大名鼎鼎的mp3)、MPEG-2 AAC 、MPEG-4 AAC等等。注意:DVD音频没有采用Mpeg的。
    • H.26X系列(由ITU[国际电传视讯联盟]主导,侧重网络传输,注意:只是视频编码)

      • 包括H.261、H.262、H.263、H.263+、H.263++、H.264(就是MPEG4 AVC-合作的结晶)
  • 目前最常用的编码标准是视频H.264,音频AAC。

FFmpeg 的基本使用

  • 我们将视频录制完成后,使用视频编码软件对视频进行编码,本项目 使用FFmpeg对视频进行编码 。

  • FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。

  • 测试是否正常:cmd运行 ffmpeg -version

FFmpeg运行测试

  • 安装成功,作下简单测试

  • 将一个.avi文件转成mp4、mp3、gif等。

  • 比如我们将nacos.avi文件转成mp4,运行如下命令:D:\DEVELOP\ffmpeg\ffmpeg.exe -i 1.avi 1.mp4这里前面的部分为FFmpeg在你电脑中的位置

  • 也可以将ffmpeg.exe配置到环境变量path中,进入视频目录直接运行:ffmpeg.exe -i 1.avi 1.mp4

  • 转成mp3:ffmpeg -i nacos.avi nacos.mp3

  • 转成gif:ffmpeg -i nacos.avi nacos.gif

  • 官方文档(英文):http://ffmpeg.org/ffmpeg.html

视频处理工具类

  • 将课程资料的工具类中的util拷贝至base工程。
  • 其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类。
  • 下边看下这个类的代码,并进行测试。
  • 我们要通过ffmpeg对视频转码,Java程序调用ffmpeg,使用java.lang.ProcessBuilder去完成,具体在Mp4VideoUtil类的63行,下边进行简单的测试,下边的代码运行本机安装的QQ软件。
1
2
3
4
5
6
//通过java程序打开电脑上的软件示例
        ProcessBuilder builder = new ProcessBuilder();
        builder.command("D:\\Program\\Typora\\Typora.exe");
        //将标准输入流和错误输入流合并,通过标准输入流程读取信息
        builder.redirectErrorStream(true);
        Process p = builder.start();
  • 对Mp4VideoUtil类需要学习使用方法,下边代码将一个avi视频转为mp4视频,如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public static void main(String[] args) throws IOException {
        /*ffmpeg的路径
        ffmpeg的安装位置*/
        String ffmpeg_path = "D:\\DEVELOP\\ffmpeg\\ffmpeg.exe";
        //源avi视频的路径
        String video_path = "D:\\MyFile\\BUCT\\大一\\小学期\\英语语音\\英语语音模仿材料\\Steve.Jobs的演讲.AVI";
        //转换后mp4文件的名称
        String mp4_name = "Steve.Jobs的演讲.mp4";
        //转换后mp4文件的路径
        String mp4_path = "D:\\MyFile\\BUCT\\Steve.Jobs的演讲.mp4";
        //创建工具类对象
        Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
        //开始视频转换,成功将返回success
        String s = videoUtil.generateMp4();
        System.out.println(s);
    }
  • 执行main方法,最终在控制台输出 success 表示执行成功。

分布式任务处理

什么是分布式任务调度

  • 对一个视频的转码可以理解为一个任务的执行,如果视频的数量比较多,如何去高效处理一批任务呢?

  • 如下有两种方案:

    1. 多线程
      • 多线程是充分利用单机的资源。
    2. 分布式加多线程
      • 充分利用多台计算机,每台计算机使用多线程处理。
  • 方案2可扩展性更强。

  • 方案2是一种分布式任务调度的处理方案。

  • 什么是分布式任务调度?

  • 我们可以先思考一下面业务场景的解决方案:

    • 每隔24小时执行数据备份任务。
    • 12306网站会根据车次不同,设置几个时间点分批次放票。
    • 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
    • 商品成功发货后,需要向客户发送短信提醒。
  • 类似的场景还有很多,我们该如何实现?

  • 多线程方式实现:

    • 学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
  • 以下代码简单实现了任务调度的功能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {    
    //任务执行间隔时间
    final long timeInterval = 1000;
    Runnable runnable = new Runnable() {
        public void run() {
            while (true) {
                //TODO:something
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    Thread thread = new Thread(runnable);
    thread.start();
}
  • 上面的代码实现了按一定的间隔时间执行任务调度的功能。

  • Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。

  • Timer实现方式

1
2
3
4
5
6
7
8
9
public static void main(String[] args){  
    Timer timer = new Timer();  
    timer.schedule(new TimerTask(){
        @Override  
        public void run() {  
           //TODO:something
        }  
    }, 1000, 2000);  //1秒后开始调度,每2秒执行一次
}
  • Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。

  • ScheduledExecutor方式实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public static void main(String [] agrs){
    ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
    service.scheduleAtFixedRate(
            new Runnable() {
                @Override
                public void run() {
                    //TODO:something
                    System.out.println("todo something");
                }
            }, 1,
            2, TimeUnit.SECONDS);
}
  • Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。

  • Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。

  • 第三方Quartz方式实现,项目地址:https://github.com/quartz-scheduler/quartz

  • Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。

  • 下边是一个例子代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String [] agrs) throws SchedulerException {
    //创建一个Scheduler
    SchedulerFactory schedulerFactory = new StdSchedulerFactory();
    Scheduler scheduler = schedulerFactory.getScheduler();
    //创建JobDetail
    JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
    jobDetailBuilder.withIdentity("jobName","jobGroupName");
    JobDetail jobDetail = jobDetailBuilder.build();
    //创建触发的CronTrigger 支持按日历调度
        CronTrigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("triggerName", "triggerGroupName")
                .startNow()
                .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
                .build();
    scheduler.scheduleJob(jobDetail,trigger);
    scheduler.start();
}

public class MyJob implements Job {
    @Override
    public void execute(JobExecutionContext jobExecutionContext){
        System.out.println("todo something");
    }
}
  • 任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。

  • 什么是分布式任务调度?

  • 通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:

分布式任务调度例子

  • 分布式调度要实现的目标:
  • 不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
    1. 并行任务调度
      • 并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
      • 如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
    2. 高可用
      • 若某一个实例宕机,不影响其他实例来执行任务。
    3. 弹性扩容
      • 当集群中增加实例就可以提高并执行任务的处理效率。
    4. 任务管理与监测
      • 对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
    5. 避免任务重复执行
      • 当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。

XXL-JOB介绍

xxl-job结构

  • 调度中心:

    • 负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;

    • 主要职责为执行器管理、任务管理、监控运维、日志管理等

  • 任务执行器:

    • 负责接收调度请求并执行任务逻辑;

    • 主要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等

  • 任务:负责执行具体的业务处理。

  • 调度中心与执行器之间的工作流程如下:

调度中心与执行器之间的工作流程

  • 执行流程:
    1. 任务执行器根据配置的调度中心的地址,自动注册到调度中心
    2. 达到任务触发条件,调度中心下发任务
    3. 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
    4. 执行器消费内存队列中的执行结果,主动上报给调度中心
    5. 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

搭建XXL-JOB

调度中心
xxl-job工程目录结构
  • xxl-job-admin:调度中心

  • xxl-job-core:公共依赖

  • xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)

    • xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
    • xxl-job-executor-sample-frameless:无框架版本;
  • doc :文档资料,包含数据库脚本

  • 在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库

  • 使用虚拟机执行docker start xxl-job-admin自动启动xxl-job调度中心

  • 访问:http://192.168.101.65:8088/xxl-job-admin/

  • 账号和密码:admin/123456

  • 如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。

执行器
  • 下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。

    1. 下边进入调度中心添加执行器
    2. 点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
    3. 首先在媒资管理模块的service工程添加依赖,在项目的父工程已约定了版本2.3.1
    1
    2
    3
    4
    
    <dependency>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job-core</artifactId>
    </dependency>
    
    1. 在nacos下的media-service-dev.yaml下配置xxl-job
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    xxl:
      job:
        admin: 
          addresses: http://192.168.101.65:8088/xxl-job-admin
        executor:
          appname: media-process-service
          address: 
          ip: 
          port: 9999
          logpath: /data/applogs/xxl-job/jobhandler
          logretentiondays: 30
        accessToken: default_token
    
    1. 配置xxl-job的执行器
      • 将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
  • 到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。

  • 启动后观察日志,出现下边的日志表示执行器在调度中心注册成功

xxl-job日志

  • 同时观察调度中心中的执行器界面,在线机器地址处已显示1个执行器。

调度中心执行器界面

执行任务
  • 下边编写任务,参考示例工程中任务类的编写方法。

  • 在媒资服务service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Component
 @Slf4j
public class SampleJob {
 /**
  * 1、简单任务示例(Bean模式)
  */
 @XxlJob("testJob")
 public void testJob() throws Exception {
  log.info("开始执行.....");

 }
}
  • 下边在调度中心添加任务,进入任务管理

  • 点击新增,填写任务信息

    • 调度类型:

      • 固定速度指按固定的间隔定时调度。

      • Cron,通过Cron表达式实现更丰富的定时调度策略。

        • Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:

        • {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}

        • xxl-job提供图形界面去配置

    • 运行模式:有BEAN和GLUE

      • bean模式较常用就是在项目工程中编写执行器的任务代码
      • GLUE是将任务代码编写在调度中心。
    • JobHandler即任务方法名,填写任务方法上@XxlJob注解中的名称。

    • 路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。

  • 添加成功,启动任务

  • 通过调度日志查看任务执行情况

  • 下边启动媒资管理的service工程,启动执行器。

  • 观察执行器方法的执行。

  • 如果要停止任务需要在调度中心操作

  • 任务跑一段时间注意清理日志

分片广播

  • 掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。

执行器集群

  • 执行器在集群部署下调度中心有哪些路由策略呢?
  • 查看xxl-job官方文档,阅读高级配置相关的内容:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
高级配置:
    - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
        FIRST(第一个):固定选择第一个机器;
        LAST(最后一个):固定选择最后一个机器;
        ROUND(轮询):;
        RANDOM(随机):随机选择在线的机器;
        CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
        LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
        LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
        FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
        BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
        SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

    - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
    - 调度过期策略:
        - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
        - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
    - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
        单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
        丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
        覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
    - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
  • 下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。

分片广播策略

  • 每个执行器收到调度请求同时接收分片参数。

  • xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。

  • 作业分片适用哪些场景呢?

    • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;

    • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。

  • 所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。

  • 使用说明:

    • “分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
  • Java语言任务获取分片参数方式:

    • BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler”:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**
 * 2、分片广播任务
 */
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
    // 分片序号,从0开始
    int shardIndex = XxlJobHelper.getShardIndex();
    // 分片总数
    int shardTotal = XxlJobHelper.getShardTotal();
    ....

视频处理技术方案

作业分片方案

  • 掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。

  • 任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?

  • XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。

  • 下图表示了多个执行器获取视频处理任务的结构:

多个执行器获取任务

  • 每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上分片总数,如果等于分片序号则执行此任务。

  • 上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:

    • 1 % 2 = 1 执行器2执行
    • 2 % 2 = 0 执行器1执行
    • 3 % 2 = 1 执行器2执行
    • 以此类推

保证任务不重复执行

  • 通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?

  • 首先配置调度过期策略:

  • 查看文档如下:

    • 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;

      • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;

      • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;

      • 这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。

  • 其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。

  • 查看文档如下:

    • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;

    • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;

    • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;

    • 这里如果选择覆盖之前调度则可能重复执行任务,这里选择丢弃后续调度或单机串行方式来避免任务重复执行。

  • 只做这些配置可以保证任务不会重复执行吗?

  • 做不到,还需要保证任务处理的幂等性。

  • 什么是幂等性?

  • 它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。

  • 幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。

  • 解决幂等性常用的方案:

    1. 数据库约束,比如:唯一索引,主键。
    2. 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
    3. 唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
  • 基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。

视频处理方案

  • 确定了分片方案,下边梳理整个视频上传及处理的业务流程。

视频上传和处理的流程

  • 上传视频成功向视频处理待处理表添加记录。

  • 视频处理的详细流程如下:

视频处理流程

  1. 任务调度中心广播作业分片。
  2. 执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
  3. 执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
  4. 执行器启动多线程去处理任务。
  5. 任务处理完成,上传处理后的视频到MinIO。
  6. 将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。

查询待处理任务

需求分析

  • 查询待处理任务只处理未提交及处理失败的任务,任务处理失败后进行重试,最多重试3次。

  • 任务处理成功将待处理记录移动到历史任务表。

  • 下图是待处理任务表:

待处理任务表

  • 历史任务表与待处理任务表的结构相同。

添加待处理任务

  • 上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
  • 根据MIME Type去判断是否是avi视频,下边列出部分MIME Type
Video Type Extension MIME Type
MPEG-4 Video .mp4 video/mp4
QuickTime Video .mov video/quicktime
AVI Video .avi video/x-msvideo
WMV Video .wmv video/x-ms-wmv
Flash Video .flv video/x-flv
Ogg Video .ogv video/ogg
WebM Video .webm video/webm
3GP Video .3gp video/3gpp
3G2 Video .3g2 video/3gpp2
AVCHD Video .mts video/avchd
MPEG Video .mpe video/mpeg
DVD Video .vob video/dvd
  • avi视频的MIME Type是video/x-msvideo

  • 修改文件信息入库方法,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
 /**
     * 将文件信息保存到数据库
     * @param fileMd5
     * @param companyId
     * @param uploadFileParamsDto
     * @param bucket
     * @param objectName
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public MediaFiles addMediaFilesToDb(
            String fileMd5, Long companyId, UploadFileParamsDto uploadFileParamsDto,
            String bucket, String objectName) {
        //1.根据id查询文件,判断是否已经保存
        MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
        if(mediaFiles != null){
            return mediaFiles;
        }
        //2.文件不存在保存到数据库
        //3.拷贝信息
        mediaFiles = new MediaFiles();
        BeanUtils.copyProperties(uploadFileParamsDto,mediaFiles);
        //4.设置信息
        mediaFiles.setId(fileMd5);
        mediaFiles.setCompanyId(companyId);
        mediaFiles.setBucket(bucket);
        mediaFiles.setFilePath(objectName);
        mediaFiles.setFileId(fileMd5);
        mediaFiles.setUrl("/"+bucket+"/"+objectName);
        mediaFiles.setCreateDate(LocalDateTime.now());
        mediaFiles.setStatus("1");
        mediaFiles.setAuditStatus("002003");
        //将文件信息存储到数据库
        int insert = mediaFilesMapper.insert(mediaFiles);
        if(insert<=0){
            log.error("保存文件信息到数据库失败,{}",mediaFiles.toString());
            XueChengPlusException.cast("保存文件信息到数据库失败");
            return null;
        }
        //记录待处理任务
        addWaitingTask(mediaFiles);

        log.debug("保存文件信息到数据库成功,{}",mediaFiles.toString());
        return mediaFiles;
    }


    /**
     * @author 闲人指路
     * @description 添加待处理任务
     * @dateTime 16:08 2024/10/11
     * @param mediaFiles 媒资文件信息
     * @return void
    */
    private void addWaitingTask(MediaFiles mediaFiles){
        //1.获取文件名
        String filename = mediaFiles.getFilename();
        //2.获取文件后缀
        String extension = filename.substring(filename.lastIndexOf("."));
        //3.根据后缀获取mime-type
        String mimeType = getMimeType(extension);
        //4.判断是否是要被处理的类型
        if(!mimeType.equals("video/x-msvideo")){
            //不是,直接返回
            return;
        }
        //4.新建一个MediaProcess对象
        MediaProcess mediaProcess = new MediaProcess();
        BeanUtils.copyProperties(mediaFiles,mediaProcess);
        //设置状态为待处理
        mediaProcess.setStatus("1");
        //设置上传时间为当前时间
        mediaProcess.setCreateDate(LocalDateTime.now());
        //设置url为null
        mediaProcess.setUrl(null);
        //5.将MediaProcess对象插入到数据库
        mediaProcessMapper.insert(mediaProcess);
    }
  • 进行前后端测试,上传4个avi视频,观察待处理任务表是否存在记录,记录是否完成。

查询待处理任务

  • 如何保证查询到的待处理视频记录不重复?

  • 编写根据分片参数获取待处理任务的DAO方法,在MediaProcessMapper中定义DAO接口如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
     * @author 闲人指路
     * @description 根据分片参数查询待处理任务
     * @dateTime 20:05 2024/10/11
     * @param shardIndex 当前分片序号
     * @param shardTotal 总共分片数
     * @param count 需要获取的任务数
     * @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
    */
    @Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")
    List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal,@Param("shardIndex") int shardIndex,@Param("count") int count);
  • 定义Service接口,查询待处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/**
     * 获取待处理任务
     * @author 闲人指路
     * @dateTime 20:07 2024/10/11
     * @param shardIndex 当前分片号
     * @param shardTotal 总分片数
     * @param count 获取的任务数
     * @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
     */
    public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count);
  • service接口实现
1
2
3
4
@Override
    public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
        return mediaProcessMapper.selectListByShardIndex(shardIndex,shardTotal,count);
    }

开始执行任务

分布式锁

  • 前边分析了保证任务不重复执行的方案,理论上每个执行器分到的任务是不重复的,但是当在执行器弹性扩容时无法绝对避免任务不重复执行,比如:原来有四个执行器正在执行任务,由于网络问题原有的0、1号执行器无法与调度中心通信,调度中心就会对执行器重新编号,原来的3、4执行器可能就会执行和0、1号执行器相同的任务。

  • 为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决,如下代码:

1
2
3
synchronized(锁对象){
   执行任务...
}
  • synchronized只能保证同一个虚拟机中多个线程去争抢锁。

synchronized锁

  • 如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。

  • 现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:

分布式锁

  • 虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。

  • 该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。

  • 实现分布式锁的方案有很多,常用的如下:

    1. 基于数据库实现分布锁

      • 利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
    2. 基于redis实现锁

      • redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。

      • 拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。

    3. 使用zookeeper实现

      • zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
  • 本次选用数据库实现分布锁,后边的模块会选用其它方案到时再详细介绍。

开启任务

  • 下边基于数据库方式实现分布锁,开始执行任务将任务执行状态更新为4表示任务执行中。

  • 下边的sql语句可以实现更新操作:

1
update media_process m set m.status='4' where  m.id=?
  • 如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。

  • 使用乐观锁方式实现更新操作:

1
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?
  • 多个线程同时执行上边的sql只会有一个线程执行成功。

  • 什么是乐观锁、悲观锁?

    • synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。

    • 乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。

    • 数据库的乐观锁实现方式是在表中增加一个version字段,更新时判断是否等于某个版本,等于则更新否则更新失败,如下方式。

1
update t1 set t1.data1 = '',t1.version='2' where t1.version='1'
  • 实现如下:

  • 在MediaProcessMapper中定义mapper

1
2
3
4
5
6
7
8
/**
     * 获取任务锁(利用数据库的乐观锁实现分布式锁)
     * @author 闲人指路
     * @param id
     * @return
     */
    @Update("UPDATE media_process SET status='4' where (status = 1 or status = 3) and fail_count < 3 and id = #{id}")
    int startTask(@Param("id") Long id);
  • 在MediaFileProcessService中定义接口
1
2
3
4
5
6
7
 /**
     * 获取任务的分布式锁
     * @author 闲人指路
     * @param id
     * @return
     */
    public boolean startTask(Long id);

更新任务状态

  • 任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。

  • 在MediaFileProcessService接口添加方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
 /**
     * 保存任务结果
     * @param taskId  任务id
     * @param status 任务状态
     * @param fileId  文件id
     * @param url url
     * @param errorMsg 错误信息
     * @return void
     * @author 闲人指路
     */
    void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);
  • service接口方法实现如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 @Override
    public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
        //1.根据任务id获取待处理任务
        MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
        //2.判断该任务是否存在
        if(mediaProcess== null){
            //2.1不存在,直接返回
            return ;
        }
        //3.判断任务执行是否成功
        if("3".equals(status)){
            //4.任务执行失败
            //4.1直接更新任务状态为失败,增加一次失败次数,更新失败原因
            mediaProcess.setStatus(status);
            mediaProcess.setFailCount(mediaProcess.getFailCount()+1);
            mediaProcess.setErrormsg(errorMsg);
            LambdaQueryWrapper<MediaProcess> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(MediaProcess::getId,taskId);
            mediaProcessMapper.update(mediaProcess,queryWrapper);
            //4.2 直接返回
            return ;

        }
        //5.任务执行成功
        //5.1更新媒资表的url为mp4视频的url
        MediaFiles mediaFiles = new MediaFiles();
        mediaFiles.setId(fileId);
        mediaFiles.setUrl(url);
        LambdaQueryWrapper<MediaFiles> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(MediaFiles::getId,fileId);
        mediaFilesMapper.update(mediaFiles,queryWrapper);
        //5.2删除待处理表中的这一条任务
        mediaProcessMapper.deleteById(taskId);
        //5.3将该任务插入历史任务表
        MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
        BeanUtils.copyProperties(mediaProcess,mediaProcessHistory);
        //5.3.1 更新任务状态为成功
        mediaProcessHistory.setStatus(status);
        //5.3.2 更新完成时间
        mediaProcessHistory.setFinishDate(LocalDateTime.now());
        //5.3.3 更新url
        mediaProcessHistory.setUrl(url);
        //5.3.4 插入历史任务表
        mediaProcessHistoryMapper.insert(mediaProcessHistory);
    }

视频处理

  • 视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。

  • 所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。

  • 定义任务类VideoTask 如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
 * 视频处理任务类
 * @author 闲人指路
 */
@Slf4j
@Component
public class VideoTask {

    @Autowired
    private MediaFileProcessService mediaFileProcessService;
    @Autowired
    private MediaFileService mediaFileService;
    //ffmpeg在本机的路径
    @Value("${videoprocess.ffmpegpath}")
    private String ffmpeg_path;


    /**
     * 分片广播任务
     */
    @XxlJob("VideoJobHandler")
    public void shardingJobHandler() throws Exception {
        // 1.分片参数
        //1.1执行器的序号,从0开始
        int shardIndex = XxlJobHelper.getShardIndex();
        //1.2执行器的总数
        int shardTotal = XxlJobHelper.getShardTotal();
        //确定cpu的核心数
        int processors = Runtime.getRuntime().availableProcessors();
        //2.查询待处理任务
        List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardTotal, shardIndex, processors);
        //3.获取任务数量,以创建线程池
        int size = mediaProcessList.size();
        log.debug("取到任务的数量:{}",size);
        if(size<=0){
            return ;
        }
        //4.创建一个线程池,用于执行视频转码任务
        ExecutorService executorService = Executors.newFixedThreadPool(size);
        //5.遍历任务
        //使用计数器
        CountDownLatch countDownLatch=new CountDownLatch(size);
        mediaProcessList.forEach(mediaProcess -> {
            //6.将每一个任务都加入线程池
            executorService.execute(()->{
                try {
                    //执行任务逻辑
                    Long taskId = mediaProcess.getId();
                    String fileId = mediaProcess.getFileId();
                    //3.获取任务的分布式锁
                    boolean isSuccess = mediaFileProcessService.startTask(taskId);
                    //判断获取是否成功
                    if(!isSuccess){
                        //获取失败,该任务已经被执行或者超过失败次数上限,直接返回
                        return ;
                    }
                    //4.执行视频转码任务
                    String bucket = mediaProcess.getBucket();
                    String objectName = mediaProcess.getFilePath();
                    //下载minio的视频到本地
                    File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
                    //判断是否为空
                    if(file==null){
                        log.error("下载视频出错了,任务id:{},bucket:{},objectName:{}",taskId,bucket,objectName);
                        mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"下载视频失败");
                        return ;
                    }
                    //源avi视频的路径
                    String video_path = file.getAbsolutePath();
                    //转换后mp4文件的名称
                    String mp4_name = fileId+".mp4";
                    //转换后mp4文件的路径
                    //创建一个临时文件
                    File tempFile = null;
                    try {
                        tempFile = File.createTempFile("minio", ".mp4");
                    } catch (IOException e) {
                        log.error("创建临时文件异常:{}",e.getMessage());
                        mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"创建临时文件失败");
                        return ;
                    }
                    String mp4_path = tempFile.getAbsolutePath();
                    //创建工具类对象
                    Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
                    //开始视频转换,成功将返回success
                    String result = videoUtil.generateMp4();
                    if(!"success".equals(result)){
                        //转码失败,保存失败详细
                        log.error("视频转码失败,任务id:{},bucket:{},objectName:{},error:{}",taskId,mediaProcess.getBucket(),mediaProcess.getFilePath(),result);
                        mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"视频转码失败");
                        return ;
                    }
                    //5.将得到的mp4视频上传到minio
                    boolean addMediaFiles2MinIo = mediaFileService.addMediaFiles2MinIo(mp4_path, "video/mp4", bucket, objectName);
                    if (!addMediaFiles2MinIo){
                        log.error("上传视频失败,任务id:{},bucket:{},objectName:{}",taskId,bucket,objectName);
                        mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"上传视频失败");
                        return ;
                    }
                    //6.调用方法,保存任务的处理结果
                    String url = getFilePath(fileId, ".mp4");
                    mediaFileProcessService.saveProcessFinishStatus(taskId,"2",fileId,url,null);
                } finally {
                    //只要一个线程结束,计数器就减一,不管执行成功或失败
                    countDownLatch.countDown();
                }

            });
        });
        //阻塞,指定最大限度的等待时间,这里最多等待30分钟
        countDownLatch.await(30, TimeUnit.MINUTES);

    }

    /**
     * 拼接url
     * @param fileMd5
     * @param fileExt
     * @return
     */
    private String getFilePath(String fileMd5,String fileExt){
        return   fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
    }

测试

基本测试

  • 进入xxl-job调度中心添加执行器和视频处理任务

  • 在xxl-job配置任务调度策略:

    1. 配置阻塞处理策略为:丢弃后续调度。
    2. 配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。
  • 配置完成开始测试视频处理:

    1. 首先上传至少4个视频,非mp4格式。
    2. 在xxl-job启动视频处理任务
    3. 观察媒资管理服务后台日志

失败测试

  1. 先停止调度中心的视频处理任务。
  2. 上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址
  3. 启动任务
  • 观察任务处理失败后是否会重试,并记录失败次数。

抢占任务测试

  1. 修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”
  2. 在抢占任务代码处打断点并选择支持多线程方式
  3. 在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。
  4. 启动任务
    • 此时多个线程执行都停留在断点处
    • 依次放行,观察同一个任务只会被一个线程抢占成功。

其他问题

任务补偿机制

  • 如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。

  • 单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。

  • 任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

达到最大失败次数

  • 当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。

分块文件清理问题

  • 上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?
    1. 在数据库中有一张文件表记录minio中存储的文件信息。
    2. 文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。
    3. 当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。

绑定媒资

需求分析

业务流程

  • 到目前为止,媒资管理已完成文件上传、视频处理、我的媒资功能等基本功能,其它模块可以使用媒资文件,本节要讲解课程计划绑定媒资文件。

  • 如何将课程计划绑定媒资呢?

  • 首先进入课程计划界面,然后选择要绑定的视频进行绑定即可。

  • 具体的业务流程如下:

    1. 教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频“。
    2. 弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。
    3. 选择视频媒资,点击提交按钮,完成课程计划绑定媒资流程。

数据模型

  • 课程计划绑定媒资文件后存储至课程计划绑定媒资表

课程计划媒资关系表

接口定义

  • 根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击”添加视频“后用户选择视频,选择视频,点击提交,前端以json格式请求以下参数:

    • 提交媒资文件id、文件名称、教学计划id

    • 示例如下:

1
2
3
4
5
{
  "mediaId": "70a98b4a2fffc89e50b101f959cc33ca",
  "fileName": "22-Hmily实现TCC事务-开发bank2的confirm方法.avi",
  "teachplanId": 257
}
  • 此接口在内容管理模块提供。

  • 在内容管理模块定义请求参数模型类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

@Data
@ApiModel(value="BindTeachplanMediaDto", description="教学计划-媒资绑定提交数据")
public class BindTeachplanMediaDto {

@ApiModelProperty(value = "媒资文件id", required = true)
private String mediaId;

@ApiModelProperty(value = "媒资文件名称", required = true)
private String fileName;

 @ApiModelProperty(value = "课程计划标识", required = true)
 private Long teachplanId;
}
  • TeachplanController类中定义接口如下:
1
2
3
4
5
@ApiOperation(value = "课程计划和媒资信息绑定")
@PostMapping("/teachplan/association/media")
public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){

}

接口开发

DAO开发

  • 使用teachplanMedia表自动生成的Mapper即可。

Service开发

  • TeachplanService中根据需求定义service接口
1
2
3
4
5
/**
     * 绑定媒资和课程计划
     * @param bindTeachplanMediaDto
     */
    void associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto);
  • 定义接口实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 @Override
    public void associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) {
        //根据课程计划id查询课程计划,判断其是否存在,且为二级计划
        Long teachplanId = bindTeachplanMediaDto.getTeachplanId();
        Teachplan teachplan = teachplanMapper.selectById(teachplanId);
        if(teachplan==null||teachplan.getGrade()!=2){
            XueChengPlusException.cast("课程计划不存在或课程计划为一级课程计划,不允许绑定媒资");
        }
        //1.首先获取课程计划id,根据课程计划id删除该课程已绑定的媒资信息
        LambdaQueryWrapper<TeachplanMedia> queryWrapper=new LambdaQueryWrapper<>();
        queryWrapper.eq(TeachplanMedia::getTeachplanId,teachplanId);
        teachplanMediaMapper.delete(queryWrapper);
        //2.创建课程计划-媒资信息对象,并设置对象的值
        TeachplanMedia teachplanMedia = new TeachplanMedia();
        BeanUtils.copyProperties(bindTeachplanMediaDto,teachplanMedia);
        teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName());
        //获取课程id
        teachplanMedia.setCourseId(teachplan.getCourseId());
        teachplanMedia.setCreateDate(LocalDateTime.now());
        teachplanMediaMapper.insert(teachplanMedia);
        //3.将对象插入数据库
        int insert = teachplanMediaMapper.insert(teachplanMedia);
        if(insert<1){
            XueChengPlusException.cast("绑定媒资失败");
        }
    }

接口层完善

  • 完善接口层调用Service层的代码
1
2
3
4
5
6
7
8
9
/**
     * 绑定媒资和课程计划
     * @param bindTeachplanMediaDto
     */
    @PostMapping("/teachplan/association/media")
    @ApiOperation("课程计划绑定媒资")
    public void associationMedia(@RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){
        teachplanService.associationMedia(bindTeachplanMediaDto);
    }

接口测试

  • 使用httpclient测试
1
2
3
4
5
6
7
8
9
### 课程计划绑定视频
POST {{media_host}}/media/teachplan/association/media
Content-Type: application/json

{
  "mediaId": "",
  "fileName": "",
  "teachplanId": ""
}
  • 前后端联调

    • 此功能较为简单推荐直接前后端联调

    • 向指定课程计划添加视频即可

解除绑定媒资

需求分析

业务流程

  • 点击已经绑定的视频名称即可解除绑定。

数据模型

  • 课程计划接触绑定媒资文件后要删除在课程计划绑定媒资表中的该条数据

课程计划媒资关系表

接口定义

  • 接口定义如下:
1
2
3
delete /teachplan/association/media/{teachPlanId}/{mediaId}

返回200状态码表示成功。
  • Controller层接口如下
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
 /**
     * 删除课程计划绑定媒资
     * @param teachplanId
     * @param mediaId
     */
    @DeleteMapping("teachplan/association/media/{teachplanId}/{mediaId}")
    @ApiOperation("删除课程计划绑定媒资")
    public void deleteTeachplanMedia(@PathVariable("teachplanId") Long teachplanId,@PathVariable("mediaId") String mediaId){
       
    }

接口开发

DAO开发

  • 直接使用自动生成的mapper即可

Service开发

  • service接口定义
1
2
3
4
5
6
/**
     * 删除课程计划绑定媒资
     * @param teachplanId
     * @param mediaId
     */
    void deleteTeachplanMedia(Long teachplanId, String mediaId);
  • service接口实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Override
    public void deleteTeachplanMedia(Long teachplanId, String mediaId) {
        //1.首先根据课程计划id查询课程计划,判断其是否存在
        Teachplan teachplan = teachplanMapper.selectById(teachplanId);
        if(teachplan==null){
            //2.不存在,直接返回
            XueChengPlusException.cast("课程计划不存在");
        }
        //3.存在,根据课程计划id和媒资id删除课程计划媒资信息表
        LambdaQueryWrapper<TeachplanMedia> queryWrapper=new LambdaQueryWrapper<>();
        queryWrapper.eq(TeachplanMedia::getTeachplanId,teachplanId)
                .eq(TeachplanMedia::getMediaId,mediaId);
        int delete = teachplanMediaMapper.delete(queryWrapper);
        if (delete<1){
            XueChengPlusException.cast("删除课程计划媒资信息失败");
        }
    }

接口层完善

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
 /**
     * 删除课程计划绑定媒资
     * @param teachplanId
     * @param mediaId
     */
    @DeleteMapping("teachplan/association/media/{teachplanId}/{mediaId}")
    @ApiOperation("删除课程计划绑定媒资")
    public void deleteTeachplanMedia(@PathVariable("teachplanId") Long teachplanId,@PathVariable("mediaId") String mediaId){
        teachplanService.deleteTeachplanMedia(teachplanId,mediaId);
    }

接口测试

  • 开发完成使用httpclient测试、前后端联调,此处直接前后端联调即可
1
2
### 课程计划接触视频绑定
DELETE {{media_host}}/media/teachplan/association/media/{teachPlanId}/{mediaId}
本博客已稳定运行
发表了4篇文章 · 总计85.59k字
·