本文描述了黑马springcloud项目学成在线的媒资管理模块的搭建
模块需求分析
模块介绍
- 媒资管理系统是每个在线教育平台所必须具备的,查阅百度百科对它的定义如下:
媒体资源管理(Media Asset Management,MAM)系统是建立在多媒体、网络、数据库和数字存储等先进技术基础上的一个对各种媒体及内容(如视/音频资料、文本文件、图表等)进行数字化存储、管理以及应用的总体解决方案,包括数字媒体的采集、编目、管理、传输和编码转换等所有环节。其主要是满足媒体资源拥有者收集、保存、查找、编辑、发布各种信息的要求,为媒体资源的使用者提供访问内容的便捷方法,实现对媒体资源的高效管理,大幅度提高媒体资源的价值。
-
每个教学机构都可以在媒资系统管理自己的教学资源,包括:视频、教案等文件。
-
目前媒资管理的主要管理对象是视频、图片、文档等,包括:媒资文件的查询、文件上传、视频处理等。
-
主要的功能如下:
- 媒资查询:教学机构查询自己所拥有的媒资信息。
- 文件上传:包括上传图片、上传文档、上传视频。
- 视频处理:视频上传成功,系统自动对视频进行编码处理。
- 文件删除:教学机构删除自己上传的媒资文件。
-
下图是课程编辑与发布的整体流程,通过下图可以看到媒资管理在整体流程的位置:

业务流程
上传图片
- 教学机构人员在课程信息编辑页面上传课程图片,课程图片统一记录在媒资管理系统。
上传视频
- 教学机构人员进入媒资管理列表查询自己上传的媒资文件。点击“媒资管理”,进入媒资管理列表页面查询本机构上传的媒资文件。
- 教育机构用户在"媒资管理"页面中点击 “上传视频” 按钮。点击“上传视频”打开上传页面。
- 选择要上传的文件,自动执行文件上传,视频上传成功会自动处理。
处理视频
- 对需要转码处理的视频系统会自动对其处理,处理后生成视频的URL。
- 处理视频没有用户界面,完全是后台自动执行。
审核媒资
- 审核媒资包括程序自动审核和人工审核,程序可以通过鉴黄接口https://www.aliyun.com/product/lvwang?spm=5176.19720258.J_3207526240.51.e93976f4rSq796审核视频,对有异议的视频由人工进行审核。
- 执行步骤
- 运营用户登入运营平台并进入媒资管理页面,查找待审核媒资
- 点击列表中媒资名称链接,可预览该媒资,若是视频,则播放视频。
- 点击列表中某媒资后的"审核" 按钮,开始审核。
- 选择审核结果,输入审核意见,完成媒资的审批过程。
绑定媒资
- 课程计划创建好后需要绑定媒资文件,比如:如果课程计划绑定了视频文件,进入课程在线学习界面后,点课程计划名称则在线播放视频。
- 如何将课程计划绑定媒资呢?
- 教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频“。
- 弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。
数据模型
-
本模块媒资文件相关的数据表如下:
- 媒资文件表
- 待处理视频表
- 视频处理历史表
-
媒资文件与课程计划绑定关系表如下:
搭建模块环境
架构的问题分析
- 当前要开发的是媒资管理服务,目前为止共三个微服务:内容管理、系统管理、媒资管理,如下图:

-
后期还会添加更多的微服务,当前这种由前端直接请求微服务的方式存在弊端:
- 如果在前端对每个请求地址都配置绝对路径,非常不利于系统维护,比如下边代码中请求系统管理服务的地址使用的是localhost
- 当系统上线后这里需要改成公网的域名,如果这种地址非常多则非常麻烦。
- 基于这个问题可以采用网关来解决,如下图:
- 这样在前端的代码中只需要指定每个接口的相对路径,如下所示:
- 在前端代码的一个固定的地方在接口地址前统一加网关的地址,每个请求统一到网关,由网关将请求转发到具体的微服务。
-
为什么所有的请求先到网关呢?
- 有了网关就可以对请求进行路由,路由到具体的微服务,减少外界对接微服务的成本,比如:400电话,路由则可以根据请求路径进行路由、根据host地址进行路由等, 当微服务有多个实例时可以通过负载均衡算法进行路由,如下:
- 另外,网关还可以实现权限控制、限流等功能。
-
项目采用Spring Cloud Gateway作为网关,网关在请求路由时需要知道每个微服务实例的地址,项目使用Nacos作用服务发现中心和配置中心,整体的架构图如下:
-
流程如下:
- 微服务启动,将自己注册到Nacos,Nacos记录了各微服务实例的地址。
- 网关从Nacos读取服务列表,包括服务名称、服务地址等。
- 请求到达网关,网关将请求路由到具体的微服务。
-
要使用网关首先搭建Nacos,Nacos有两个作用:
-
服务发现中心。
- 微服务将自身注册至Nacos,网关从Nacos获取微服务列表。
-
配置中心。
- 微服务众多,它们的配置信息也非常复杂,为了提供系统的可维护性,微服务的配置信息统一在Nacos配置。
-
搭建Nacos
服务发现中心
-
运行Nacos
- 使用课程提供的Nacos,一般我们启动课程资料给我们的虚拟机时,Nacos会自动执行,但是Nacos需要Mysql作为数据库,故Nacos必须在Mysql之后运行,单Mysql一般不会随虚拟机启动而启动,所以我们需要在启动Mysql后,在重启以下Nacos,保证不出错。
- 启动Nacos后,访问Nacos页面:虚拟机ip地址+8848/nacos,使用课程资料中的虚拟机的访问http://192.168.101.65:8848/nacos/即可。
- 账号:nacos,密码:nacos
-
在搭建Nacos服务发现中心之前需要搞清楚两个概念:namespace和group
-
namespace:用于区分环境、比如:开发环境、测试环境、生产环境。
-
group:用于区分项目,比如:xuecheng-plus项目、xuecheng2.0项目
-
-
首先在nacos配置namespace:
- 点击左侧菜单“命名空间”进入命名空间管理界面
- 点击“新建命名空间”,填写命名空间的相关信息,如下图
-
使用相同的办法再创建“测试环境”、“生产环境"的命名空间。
-
创建成功,如下图:
[!CAUTION]
注意:如果使用dev1010命名空间,在下边的配置中对namespace配置为dev1010。
-
命名空间创建完毕后,首先完成各服务注册到Naocs,下边将内容管理服务注册到nacos中。
-
在xuecheng-plus-parent中添加依赖管理
1 2 3 4 5 6 7
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring-cloud-alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency>
-
在内容管理模块的接口工程、系统管理模块的接口工程中添加如下依赖
1 2 3 4
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
-
配置nacos的地址
- 在系统管理的接口工程的配置文件中配置如下信息:
1 2 3 4 5 6 7 8 9 10
#微服务配置 spring: application: name: system-service cloud: nacos: server-addr: 192.168.101.65:8848 discovery: namespace: dev group: xuecheng-plus-project
- 在内容管理的接口工程的配置文件中配置如下信息:
1 2 3 4 5 6 7 8 9 10
#微服务配置 spring: application: name: content-api cloud: nacos: server-addr: 192.168.101.65:8848 discovery: namespace: dev group: xuecheng-plus-project
-
重启内容管理服务、系统管理服务。
- 待微服务启动成功,进入Nacos服务查看服务列表
- 在 “开发环境” 命名空间下有两个服务这说明内容管理微服务和系统管理微服务在Nacos注册成功。
-
配置中心
配置三要素
-
搭建完成Nacos服务发现中心,下边搭建Nacos为配置中心,其目的就是通过Nacos去管理项目的所有配置。
-
先将项目中的配置文件分分类:
-
每个项目特有的配置
-
是指该配置只在有些项目中需要配置,或者该配置在每个项目中配置的值不同。
-
比如:spring.application.name每个项目都需要配置但值不一样,以及有些项目需要连接数据库而有些项目不需要,有些项目需要配置消息队列而有些项目不需要。
-
-
项目所公用的配置
-
是指在若干项目中配置内容相同的配置。比如:redis的配置,很多项目用的同一套redis服务所以配置也一样。
-
另外还需要知道nacos如何去定位一个具体的配置文件,即:namespace、group、dataid.
-
-
-
定位配置文件
-
通过namespace、group找到具体的环境和具体的项目。
-
通过dataid找到具体的配置文件,dataid有三部分组成
-
比如:content-service-dev.yaml配置文件 由(content-service)-(dev). (yaml)三部分组成
-
content-service:第一部分,它是在application.yaml中配置的应用名,即spring.application.name的值。
-
dev:第二部分,它是环境名,通过spring.profiles.active指定,
-
Yaml: 第三部分,它是配置文件 的后缀,目前nacos支持properties、yaml等格式类型,本项目选择yaml格式类型。
-
-
所以,如果我们要配置content-service工程的配置文件:
-
在开发环境中配置content-service-dev.yaml
-
在测试环境中配置content-service-test.yaml
-
在生产环境中配置content-service-prod.yaml
-
-
我们启动项目中传入spring.profiles.active的参数决定引用哪个环境的配置文件,例如:传入spring.profiles.active=dev表示使用dev环境的配置文件即content-service-dev.yaml。
-
配置content-service
- 下边以开发环境为例对content-service工程的配置文件进行配置,再开发环境命名空间中,添加
content-service-dev.yaml
配置
-
输入data id、group以及配置文件内容。
-
为什么没在nacos中配置下边的内容 ?
|
|
-
因为刚才说了dataid第一部分就是spring.application.name,再根据
${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
作为文件id,来读取配置。 -
而读取配置文件的顺序如下
- bootstrap.yml
- nacos中的配置文件
- 本地application.yml
-
所以spring.application.name不在nacos中配置,而是要在工程的本地(bootstrap.yaml)进行配置,然后nacos才知道怎么读取配置文件。
-
在content-service工程的test/resources 中添加bootstrap.yaml,内容如下:
|
|
- 在内容管理模块的接口工程和service工程配置依赖:
|
|
-
配置完成,运行content-service工程 的单元测试文件,能否正常测试,跟踪单元测试方法可以正常读取数据库的数据,说明从nacos读取配置信息正常。
-
通过运行观察控制台打印出下边的信息,NacosRestTemplate.java通过Post方式与nacos服务端交互读取配置信息。
|
|
配置content-api
- 以相同的方法配置content-api工程的配置文件,在nacos中的开发环境中配置content-api-dev.yaml,内容如下:
|
|
- 在content-api工程的本地配置bootstrap.yaml,内容如下:
|
|
- 注意:因为api接口工程依赖了service工程的jar,所以这里使用extension-configs扩展配置文件的方式引用service工程所用到的配置文件。
|
|
- 如果添加多个扩展文件,继续在下添加即可,如下:
|
|
- 启动content-api工程,查询控制台是否打印出了请求nacos的日志,如下:
|
|
- 使用Httpclient测试课程查询接口是否可以正常查询。
公用配置
-
还有一个优化的点是如何在nacos中配置项目的公用配置呢?
-
nacos提供了shared-configs可以引入公用配置。
-
在content-api中配置了swagger,所有的接口工程 都需要配置swagger,这里就可以将swagger的配置定义为一个公用配置,哪个项目用引入即可。
-
单独在xuecheng-plus-common分组下创建xuecheng-plus的公用配置,进入nacos的开发环境,添加swagger-dev.yaml公用配置
- 删除接口工程中对swagger的配置。
- 项目使用shared-configs可以引入公用配置。在接口工程的本地配置文件 中引入公用配置,如下:
|
|
- 再以相同的方法配置日志的公用配置。
- 在接口工程和业务工程,引入loggin-dev.yaml公用配置文件
|
|
配置完成,重启content-api接口工程,访问]http://localhost:63040/content/swagger-ui.html 查看swagger接口文档是否可以正常访问,查看控制台log4j2日志输出是否正常。
配置优先级
- 到目前为止已将所有微服务的配置统一在nacos进行配置,用到的配置文件有本地的配置文件 bootstrap.yaml和nacos上的配置文件,SpringBoot读取配置文件的顺序如下:
- 加在bootstarp.yaml,获取nacos地址、配置文件dataidid
- 根据dataid读取nacos中的配置
- 加在本地配置(例如application.yaml/application-dev.yaml),与nacos拉取的配置合并
-
引入配置文件的形式有:
- 以项目应用名方式引入(bootstrap)
- 以扩展配置文件方式引入
- 以共享配置文件方式引入
- 本地配置文件
-
各配置文件的优先级:项目应用名配置文件 > 扩展配置文件 > 共享配置文件 > 本地配置文件。
-
有时候我们在测试程序时直接在本地加一个配置进行测试,比如下边的例子:
- 我们想启动两个内容管理微服务,此时需要在本地指定不同的端口,通过VM Options参数,在IDEA配置启动参数
- 通过-D指定参数名和参数值,参数名即在bootstrap.yml中配置的server.port。
- 启动ContentApplication2,发现端口仍然是63040,这说明本地的配置没有生效。
- 这时我们想让本地最优先,可以在nacos配置文件 中配置如下即可实现:
1 2 3 4 5
#配置本地优先 spring: cloud: config: override-none: true
- 再次启动ContentApplication2,端口为63041。
导入配置文件
- 课程资料中提供了系统用的所有配置文件nacos_config_export.zip,下边将nacos_config_export.zip导入nacos。
- 进入具体的命名空间,点击“导入配置”
- 打开导入窗口

- 相同的配置选择跳过配置(上图选错了)。
- 点击“上传文件”选择资料中的nacos_config_export.zip开始导入。
配置系统管理服务
配置system-api
-
bootstrap中的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#微服务配置 spring: application: name: system-api cloud: nacos: server-addr: 192.168.101.65:8848 discovery: # 注册中心 namespace: dev group: xuecheng-plus-project config: # 配置中心 namespace: dev group: xuecheng-plus-project file-extension: yaml refresh-enabled: true extension-configs: - data-id: system-service-${spring.profiles.active}.yaml group: xuecheng-plus-project refresh: true shared-configs: - data-id: swagger-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true - data-id: logging-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true profiles: active: dev
-
nacos中的内容已经导入不要配置,即dataid为:system-api-dev.yaml的配置
配置system-service
- service项目中没有用到配置无需添加
- nacos中的内容已经导入不要配置,即dataid为:system-service-dev.yaml的配置
搭建Gateway
-
本项目使用Spring Cloud Gateway作为网关,下边创建网关工程。
-
新建一个网关工程。

- 工程结构

- 添加依赖:
|
|
- 配置网关的bootstrap.yaml配置文件
|
|
- 在nacos上配置网关策略
|
|
- 启动网关工程,通过网关工程访问微服务进行测试。
- 在http-client-env.json中配置网关的地址

- 使用httpclient测试课程查询接口,如下:
|
|
- 运行,观察是否可以正常访问接口
- 网关工程搭建完成即可将前端工程中的接口地址改为网关的地址

搭建媒资工程
-
至此网关、Nacos已经搭建完成,下边将媒资工程导入项目。
-
从黑马提供的课程资料中获取媒资工程 xuecheng-plus-media,拷贝到项目工程根目录。
-
下边做如下配置:
- 创建媒资数据库xc_media,并导入资料目录中的xcplus_media.sql
- 修改nacos上的media-service-dev.yaml配置文件中的数据库链接信息
- 重启media-api工程只要能正常启动成功即可,稍后根据需求写接口。
分布式文件系统
什么是分布式文件系统
- 要理解分布式文件系统首先了解什么是文件系统。
- 查阅维基百科:
计算机的文件系统是一种存储和组织计算机数据的方法,它使得对其访问和查找变得容易,文件系统使用文件和树形目录的抽象逻辑概念代替了硬盘和光盘等物理设备使用数据块的概念,用户使用文件系统来保存数据不必关心数据实际保存在硬盘(或者光盘)的地址为多少的数据块上,只需要记住这个文件的所属目录和文件名。在写入新数据之前,用户不必关心硬盘上的那个块地址没有被使用,硬盘上的存储空间管理(分配和释放)功能由文件系统自动完成,用户只需要记住数据被写入到了哪个文件中。
- 文件系统是负责管理和存储文件的系统软件,操作系统通过文件系统提供的接口去存取文件,用户通过操作系统访问磁盘上的文件。
- 下图指示了文件系统所处的位置:

-
常见的文件系统:FAT16/FAT32、NTFS、HFS、UFS、APFS、XFS、Ext4等 。
-
现在有个问题,一此短视频平台拥有大量的视频、图片,这些视频文件、图片文件该如何存储呢?如何存储可以满足互联网上海量用户的浏览。
-
分布式文件系统就是海量用户查阅海量文件的方案。
-
我们阅读维基百科去理解分布式文件系统的定义:
相对于本机端的文件系统而言,分布式文件系统(英语:Distributed file system, DFS),或是网络文件系统(英语:Network File System),是一种允许文件透过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间。
- 通过概念可以简单理解为:一个计算机无法存储海量的文件,通过网络将若干计算机组织起来共同去存储海量的文件,去接收海量用户的请求,这些组织起来的计算机通过网络进行通信,如下图:

-
好处:
- 一台计算机的文件系统处理能力扩充到多台计算机同时处理。
- 一台计算机挂了还有另外副本计算机提供数据。
- 每台计算机可以放在不同的地域,这样用户就可以就近访问,提高访问速度。
-
市面上有哪些分布式文件系统的产品呢?
- NFS
网络文件系统(英语:Network File System,缩写作 NFS)是一种分布式文件系统,力求客户端主机可以访问服务器端文件,并且其过程与访问本地存储时一样,它由Sun微系统(已被甲骨文公司收购)开发,于1984年发布[1]。
它基于开放网络运算远程过程调用(ONC RPC)系统:一个开放、标准的RFC系统,任何人或组织都可以依据标准实现它。
- 特点:
- 在客户端上映射NFS服务器的驱动器。
- 客户端通过网络访问NFS服务器的硬盘完全透明。
- GFS
Google文件系统(英语:Google File System,缩写为GFS或GoogleFS),一种专有分布式文件系统,由Google公司开发,运行于Linux平台上[1]。尽管Google在2003年公布了该系统的一些技术细节,但Google并没有将该系统的软件部分作为开源软件发布[2]。
- 特点:
- GFS采用主从结构,一个GFS集群由一个master和大量的chunkserver组成。
- master存储了数据文件的元数据,一个文件被分成了若干块存储在多个chunkserver中。
- 用户从master中获取数据元信息,向chunkserver存储数据。
- HDFS
HDFS,是Hadoop Distributed File System的简称,是Hadoop抽象文件系统的一种实现。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。 HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。
- 特点
- HDFS采用主从结构,一个HDFS集群由一个名称结点和若干数据结点组成。
- 名称结点存储数据的元信息,一个完整的数据文件分成若干块存储在数据结点。
- 客户端从名称结点获取数据的元信息及数据分块的信息,得到信息客户端即可从数据块来存取数据。
-
云计算厂家
- 阿里云对象存储服务(Object Storage Service,简称 OSS),是阿里云提供的海量、安全、低成本、高可靠的云存储服务。其数据设计持久性不低于 99.9999999999%(12 个 9),服务设计可用性(或业务连续性)不低于 99.995%。
- 百度对象存储BOS提供稳定、安全、高效、高可扩展的云存储服务。您可以将任意数量和形式的非结构化数据存入BOS,并对数据进行管理和处理。BOS支持标准、低频、冷和归档存储等多种存储类型,满足多场景的存储需求。
MinIo
介绍
-
本项目采用MinIO构建分布式文件系统,MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。
-
它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容 Amazon S3接口,提供了 Java、Python、GO等多版本SDK支持。
-
MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。
-
去中心化有什么好处?
-
在大数据领域,通常的设计理念都是无中心和分布式。Minio分布式模式可以帮助你搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置。
-
它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。如下图:
-
Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。
-
使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。 比如上边集合中有4个以内的硬盘损害仍可保证数据恢复,不影响上传和下载,如果多于一半的硬盘坏了则无法恢复。
-
测试Docker环境
-
开发阶段和生产阶段统一使用Docker下的MINIO。
-
在下发的虚拟机中已安装了MinIO的镜像和容器,执行docker start minio(或者sh /data/soft /restart.sh)启动Docker下的MinIO
-
启动完成登录MinIO查看是否正常。

-
本项目创建两个buckets(点击 create bucket 即可):
-
mediafiles: 普通文件
-
video:视频文件
[!CAUTION]
注意:
- 为bucket取名时,不能用大写字母
- 创建完bucket后,点击manage,进去修改访问权限为public
-
SDK
上传文件
-
MinIO提供多个语言版本SDK的支持,下边找到java版本的文档:
-
地址:https://docs.min.io/docs/java-client-quickstart-guide.html
-
最低需求Java 1.8或更高版本:
-
maven依赖如下:
|
|
-
在media-service工程添加此依赖。
-
参数说明:
-
需要三个参数才能连接到minio服务。
参数 说明 Endpoint 对象存储服务的URL Access Key Access key就像用户ID,可以唯一标识你的账户。 Secret Key Secret key是你账户的密码。 -
官方的示例代码如下:
|
|
- 参考示例在media-service工程中测试上传文件功能,
- 首先创建一个用于测试的bucket(testbucket),修改访问权限为public
- 在xuecheng-plus-media-service工程 的test下编写测试代码如下:
|
|
-
执行upload方法,分别测试向桶的根目录上传文件以及子目录上传文件。
-
上传成功,通过web控制台查看文件,并预览文件。
-
说明:
-
设置contentType可以通过com.j256.simplemagic.ContentType枚举类查看常用的mimeType(媒体类型)
-
通过扩展名得到mimeType,代码如下:
1 2 3 4 5 6 7
//根据扩展名取出mimeType ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4"); String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流 //通过扩展名来获得mimeType的类型 if(extensionMatch!=null){ mimeType = extensionMatch.getMimeType(); }
- 完善上边的代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Test public void upload() { //根据扩展名取出mimeType ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(".mp4"); String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;//通用mimeType,字节流 //通过扩展名来获得mimeType的类型 if(extensionMatch!=null){ mimeType = extensionMatch.getMimeType(); } try { UploadObjectArgs testbucket = UploadObjectArgs.builder() .bucket("testbucket") // .object("test001.mp4") .object("001/test001.mp4")//添加子目录 .filename("D:\\develop\\upload\\1mp4.temp") .contentType(mimeType)//默认根据扩展名确定文件内容类型,也可以指定 .build(); minioClient.uploadObject(testbucket); System.out.println("上传成功"); } catch (Exception e) { e.printStackTrace(); System.out.println("上传失败"); } }
-
删除文件
|
|
查询文件
- 通过查询文件查看文件是否存在minio中。
- 参考:https://docs.min.io/docs/java-client-api-reference#getObject
|
|
- 校验文件的完整性,对文件计算出md5值,比较原始文件的md5和目标文件的md5,一致则说明完整
|
|
上传图片
需求分析
业务流程
-
课程图片是宣传课程非常重要的信息,在新增课程界面上传课程图片,也可以修改课程图片。
-
上传课程图片总体上包括两部分:
- 上传课程图片前端请求媒资管理服务将文件上传至分布式文件系统,并且在媒资管理数据库保存文件信息。
- 上传图片成功保存图片地址到课程基本信息表中。
-
详细流程如下:
- 前端进入上传图片界面
- 上传图片,请求媒资管理服务。
- 媒资管理服务将图片文件存储在MinIO。
- 媒资管理记录文件信息到数据库。
- 前端请求内容管理服务保存课程信息,在内容管理数据库保存图片地址。
数据模型
- 涉及到的数据表有:课程信息表中的图片字段、媒资数据库的文件表,下边主要看媒资数据库的文件表:

准备环境
- 在nacos配置中minio的相关信息,进入media-service-dev.yaml:
|
|
- 在media-service工程编写minio的配置类:
|
|
[!CAUTION]
注意:若要使用**@ConfigurationPropertites**,要再类中添加Getter和Setter方法,该注解就是通过这两个方法进行配置的注入的,并且由于该类不在启动类的包下,故要在启动类上加上**@EnableConfigurationProperties(MinioConfig.class)**注解
接口定义
-
根据需求分析,下边进行接口定义,此接口定义为一个通用的上传文件接口,可以上传图片或其它文件。
-
首先分析接口:
-
请求地址:/media/upload/coursefile
-
请求内容:Content-Type: multipart/form-data;form-data; name=“filedata”; filename=“具体的文件名称”
-
响应参数:文件信息,如下
|
|
- 定义上传响应模型类:
|
|
- 定义接口如下:
|
|
-
接口定义好后可以用httpclient工具测试一下
-
使用httpclient测试
|
|
接口开发
Dao开发
- 根据需求分析DAO层实现向media_files表插入一条记录,使用media_files表生成的mapper即可。、
Service开发
-
Service方法需要提供一个更加通用的保存文件的方法。
-
定义请求参数类:
|
|
- 定义service接口:
|
|
- 实现方法如下:
|
|
完善接口层
- 完善接口层代码 :
|
|
接口测试
- 首先使用httpclient测试
|
|
- 再进行前后端联调测试
- 在新增课程、编辑课程界面上传图,保存课程信息后再次进入编辑课程界面,查看是否可以正常保存课程图片信息。
- 上图图片完成后,进入媒资管理,查看文件列表中是否有刚刚上传的图片信息。
Service事务优化
- 上边的service方法优化后并测试通过,现在思考关于uploadFile方法的是否应该开启事务。
- 目前是在uploadFile方法上添加@Transactional,当调用uploadFile方法前会开启数据库事务,如果上传文件过程时间较长那么数据库的事务持续时间就会变长,这样数据库链接释放就慢,最终导致数据库链接不够用。
- 我们只将addMediaFilesToDb方法添加事务控制即可,uploadFile方法上的@Transactional注解去掉。
- 优化后如下:
|
|
-
我们人为在int insert = mediaFilesMapper.insert(mediaFiles);下边添加一个异常代码int a=1/0;
-
测试是否事务控制。很遗憾,事务控制失败。
-
方法上已经添加了@Transactional注解为什么该方法不能被事务控制呢?
-
如果是在uploadFile方法上添加@Transactional注解就可以控制事务,去掉则不行。
-
现在的问题其实是一个非事务方法调同类一个事务方法,事务无法控制,这是为什么?
-
下边分析原因:
- 如果在uploadFile方法上添加@Transactional注解,代理对象执行此方法前会开启事务,如下图:
- 如果在uploadFile方法上没有@Transactional注解,代理对象执行此方法前不进行事务控制,如下图:
-
所以判断该方法是否可以事务控制必须保证
- 是通过代理对象调用此方法,
- 且此方法上添加了@Transactional注解。
-
现在在addMediaFilesToDb方法上添加@Transactional注解,也不会进行事务控制是因为并不是通过代理对象执行的addMediaFilesToDb方法。
-
为了判断在uploadFile方法中去调用addMediaFilesToDb方法是否是通过代理对象去调用,我们可以打断点跟踪。
-
我们发现在uploadFile方法中去调用addMediaFilesToDb方法不是通过代理对象去调用。
-
如何解决呢?通过代理对象去调用addMediaFilesToDb方法即可解决。
-
在MediaFileService的实现类中注入MediaFileService的代理对象,如下:
1 2
@Autowired MediaFileService currentProxy;
-
将addMediaFilesToDb方法提成接口。
|
|
- 通过代理对象调用addMediaFilesToDb:
|
|
- 再次测试事务是否可以正常控制。
上传视频
需求分析
业务流程
- 教学机构人员进入媒资管理列表查询自己上传的媒资文件。
- 教育机构用户在"媒资管理"页面中点击 “上传视频” 按钮。
- 选择要上传的文件,自动执行文件上传。
- 视频上传成功会自动处理,处理完成可以预览视频。
断点续传技术
什么是断点续传
-
通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。
-
什么是断点续传,引用百度百科:
断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压缩包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载未完成的部分,而没有必要从头开始上传下载,断点续传可以提高节省操作时间,提高用户体验性。
-
断点续传流程如下图:
- 流程如下:
- 前端上传前先把文件分成块
- 一块一块的上传,上传中断后重新上传,已上传的分块则不用再上传
- 各分块上传完成最后在服务端合并文件
分块与合并测试
-
为了更好的理解文件分块上传的原理,下边用java代码测试文件的分块与合并。
-
文件分块的流程如下:
- 获取源文件长度
- 根据设定的分块文件的大小计算出块数
- 从源文件读数据依次向每一个块文件写数据。
-
测试代码如下:
|
|
- 文件合并流程:
- 找到要合并的文件并按文件合并的先后进行排序。
- 创建合并文件
- 依次从合并的文件中读取数据向合并文件写入数
- 文件合并的测试代码 :
|
|
视频上传流程
- 下图是上传视频的整体流程:
- 前端对文件进行分块。
- 前端上传分块文件前请求媒资服务检查文件是否存在,如果已经存在则不再上传。
- 如果分块文件不存在则前端开始上传
- 前端请求媒资服务上传分块。
- 媒资服务将分块上传至MinIO。
- 前端将分块上传完毕请求媒资服务合并分块。
- 媒资服务判断分块上传完成则请求MinIO合并文件。
- 合并完成校验合并后的文件是否完整,如果完整则上传完成,否则删除文件。
minio合并文件测试
-
将分块文件上传至minio
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/** * 将分块文件上传的minio */ @Test public void testUploadChunk() throws IOException, ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { // 上传文件到minio String objectName = "test/chunk/"; String filePath = "D:\\MyFile\\BUCT\\chunk\\"; for (int i =0 ;i <3;i++) { UploadObjectArgs testbucket = UploadObjectArgs.builder() .bucket("testbucket") //桶名 .object(objectName+i) //文件名 放在子目录下 .filename(filePath+i) //文件路径 .build(); minioClient.uploadObject(testbucket); System.out.println("上传分片文件成功:"+i); } }
-
通过minio的合并文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/** * 用minio提供的接口合并分片文件 */ @Test public void testMerge() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException { List<ComposeSource> sources =new ArrayList<>(); for (int i=0; i<3 ;i++) { ComposeSource composeSource = ComposeSource.builder() .bucket("testbucket") .object("test/chunk/" + i) .build(); sources.add(composeSource); } ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder() .bucket("testbucket") .object("test/merge/merge01.mp4") .sources(sources) .build(); //报错: size 1048576 must be greater than 5242880 minioClient.composeObject(composeObjectArgs); }
[!NOTE]
使用minio合并文件报错:java.lang.IllegalArgumentException: source testbucket/chunk/0: size 1048576 must be greater than 5242880
minio合并文件默认分块最小5M,我们将分块改为5M再次测试。
在nacos中的media-service中增加如下配置
1 2 3 4 5
spring: servlet: multipart: max-file-size: 50MB max-request-size: 50MB
接口定义
-
根据上传视频流程,定义接口,与前端的约定是操作成功返回{code:0}否则返回{code:-1}
-
从课程资料中拷贝RestResponse.java类到base工程下的model包下。
-
定义接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
/** * @author Mr.M * @version 1.0 */ @Api(value = "大文件上传接口", tags = "大文件上传接口") @RestController public class BigFilesController { @ApiOperation(value = "文件上传前检查文件") @PostMapping("/upload/checkfile") public RestResponse<Boolean> checkfile( @RequestParam("fileMd5") String fileMd5 ) throws Exception { return null; } @ApiOperation(value = "分块文件上传前的检测") @PostMapping("/upload/checkchunk") public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { return null; } @ApiOperation(value = "上传分块文件") @PostMapping("/upload/uploadchunk") public RestResponse uploadchunk(@RequestParam("file") MultipartFile file, @RequestParam("fileMd5") String fileMd5, @RequestParam("chunk") int chunk) throws Exception { return null; } @ApiOperation(value = "合并文件") @PostMapping("/upload/mergechunks") public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5, @RequestParam("fileName") String fileName, @RequestParam("chunkTotal") int chunkTotal) throws Exception { return null; } }
上传分块开发
DAO开发
向媒资数据库的文件表插入记录,使用自动生成的Mapper接口即可满足要求。
Service开发
检查文件和分块
-
接口完成进行接口实现,首先实现检查文件方法和检查分块方法,若存在,则无需上传
-
在MediaFileService中定义service接口如下:
|
|
- service接口实现方法:
|
|
上传分块
- 定义service接口
|
|
- 接口实现
|
|
上传分块测试
- 完善接口层:
|
|
- 启动前端工程,进入上传视频界面进行前后端联调测试。
合并分块开发
service开发
- 定义service接口
|
|
- service实现
|
|
[!CAUTION]
这里为了保证事物的最小性,没有将事物直接写在主方法上,而是写在了被调用的方法上,但是这样就会使事物失效,要使事物有效,就必须用代理对象来调用该事务。
合并分块测试
-
下边进行前后端联调:
- 上传一个视频测试合并分块的执行逻辑,进入service方法逐行跟踪。
- 断点续传测试
-
上传一部分后,停止刷新浏览器再重新上传,通过浏览器日志发现已经上传过的分块不再重新上传
视频处理
视频转码需求
什么是视频编码
-
视频上传成功后需要对视频进行转码处理。
-
什么是视频编码?查阅百度百科如下:
所谓视频编码方式就是指通过压缩技术,将原始视频格式的文件转换成另一种视频格式文件的方式。视频流传输中最为重要的编解码标准有国际电联的H.261、H.263、H.264,运动静止图像专家组的M-JPEG和国际标准化组织运动图像专家组的MPEG系列标准,此外在互联网上被广泛应用的还有Real-Networks的RealVideo、微软公司的WMV以及Apple公司的QuickTime等。
-
首先我们要分清文件格式和编码格式:
-
文件格式:是指.mp4、.avi、.rmvb等,这些不同扩展名的视频文件的文件格式 ,视频文件的内容主要包括视频和音频,其文件格式是按照一 定的编码格式去编码,并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起,播放器会根据它们的封装格式去提取出编码,然后由播放器解码,最终播放音视频。
-
音视频编码格式:通过音视频的压缩技术,将视频格式转换成另一种视频格式,通过视频编码实现流媒体的传输。比如:一个.avi的视频文件原来的编码是a,通过编码后编码格式变为b,音频原来为c,通过编码后变为d。
-
-
音视频编码格式各类繁多,主要有几下几类:
-
MPEG系列(由ISO[国际标准组织机构]下属的MPEG[运动图象专家组]开发 )
- 视频编码方面主要是Mpeg1(vcd用的就是它)、Mpeg2(DVD使用)、Mpeg4(的DVDRIP使用的都是它的变种,如:divx,xvid等)、Mpeg4 AVC(正热门);音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(大名鼎鼎的mp3)、MPEG-2 AAC 、MPEG-4 AAC等等。注意:DVD音频没有采用Mpeg的。
-
H.26X系列(由ITU[国际电传视讯联盟]主导,侧重网络传输,注意:只是视频编码)
- 包括H.261、H.262、H.263、H.263+、H.263++、H.264(就是MPEG4 AVC-合作的结晶)
-
-
目前最常用的编码标准是视频H.264,音频AAC。
FFmpeg 的基本使用
-
我们将视频录制完成后,使用视频编码软件对视频进行编码,本项目 使用FFmpeg对视频进行编码 。
-
FFmpeg被许多开源项目采用,QQ影音、暴风影音、VLC等。
-
或者从常用工具软件目录找到ffmpeg.exe,并将ffmpeg.exe加入环境变量path中。
-
测试是否正常:cmd运行 ffmpeg -version
-
安装成功,作下简单测试
-
将一个.avi文件转成mp4、mp3、gif等。
-
比如我们将nacos.avi文件转成mp4,运行如下命令:
D:\DEVELOP\ffmpeg\ffmpeg.exe -i 1.avi 1.mp4
这里前面的部分为FFmpeg在你电脑中的位置 -
也可以将ffmpeg.exe配置到环境变量path中,进入视频目录直接运行:ffmpeg.exe -i 1.avi 1.mp4
-
转成mp3:ffmpeg -i nacos.avi nacos.mp3
-
转成gif:ffmpeg -i nacos.avi nacos.gif
-
官方文档(英文):http://ffmpeg.org/ffmpeg.html
视频处理工具类
- 将课程资料的工具类中的util拷贝至base工程。
- 其中Mp4VideoUtil类是用于将视频转为mp4格式,是我们项目要使用的工具类。
- 下边看下这个类的代码,并进行测试。
- 我们要通过ffmpeg对视频转码,Java程序调用ffmpeg,使用java.lang.ProcessBuilder去完成,具体在Mp4VideoUtil类的63行,下边进行简单的测试,下边的代码运行本机安装的QQ软件。
|
|
- 对Mp4VideoUtil类需要学习使用方法,下边代码将一个avi视频转为mp4视频,如下:
|
|
- 执行main方法,最终在控制台输出 success 表示执行成功。
分布式任务处理
什么是分布式任务调度
-
对一个视频的转码可以理解为一个任务的执行,如果视频的数量比较多,如何去高效处理一批任务呢?
-
如下有两种方案:
- 多线程
- 多线程是充分利用单机的资源。
- 分布式加多线程
- 充分利用多台计算机,每台计算机使用多线程处理。
- 多线程
-
方案2可扩展性更强。
-
方案2是一种分布式任务调度的处理方案。
-
什么是分布式任务调度?
-
我们可以先思考一下面业务场景的解决方案:
- 每隔24小时执行数据备份任务。
- 12306网站会根据车次不同,设置几个时间点分批次放票。
- 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
- 商品成功发货后,需要向客户发送短信提醒。
-
类似的场景还有很多,我们该如何实现?
-
多线程方式实现:
- 学过多线程的同学,可能会想到,我们可以开启一个线程,每sleep一段时间,就去检查是否已到预期执行时间。
-
以下代码简单实现了任务调度的功能:
|
|
-
上面的代码实现了按一定的间隔时间执行任务调度的功能。
-
Jdk也为我们提供了相关支持,如Timer、ScheduledExecutor,下边我们了解下。
-
Timer实现方式
|
|
-
Timer 的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行。
-
ScheduledExecutor方式实现:
|
|
-
Java 5 推出了基于线程池设计的 ScheduledExecutor,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
-
Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度,不能胜任更加复杂的调度需求。比如,设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。
-
第三方Quartz方式实现,项目地址:https://github.com/quartz-scheduler/quartz
-
Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中,Job 负责定义需要执行的任务,Trigger 负责设置调度策略,Scheduler 将二者组装在一起,并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式,通过设置CronTrigger表达式(包括:秒、分、时、日、月、周、年)进行任务调度。
-
下边是一个例子代码:
|
|
-
任务调度顾名思义,就是对任务的调度,它是指系统为了完成特定业务,基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
-
什么是分布式任务调度?
-
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:
- 分布式调度要实现的目标:
- 不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
- 并行任务调度
- 并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
- 如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
- 高可用
- 若某一个实例宕机,不影响其他实例来执行任务。
- 弹性扩容
- 当集群中增加实例就可以提高并执行任务的处理效率。
- 任务管理与监测
- 对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
- 避免任务重复执行
- 当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
- 并行任务调度
XXL-JOB介绍
-
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
-
XXL-JOB主要有调度中心、执行器、任务:
-
调度中心:
-
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
-
主要职责为执行器管理、任务管理、监控运维、日志管理等
-
-
任务执行器:
-
负责接收调度请求并执行任务逻辑;
-
主要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
-
-
任务:负责执行具体的业务处理。
-
调度中心与执行器之间的工作流程如下:
- 执行流程:
- 任务执行器根据配置的调度中心的地址,自动注册到调度中心
- 达到任务触发条件,调度中心下发任务
- 执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
- 执行器消费内存队列中的执行结果,主动上报给调度中心
- 当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
搭建XXL-JOB
调度中心
-
首先下载XXL-JOB
-
项目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
-
也可从课程资料目录获取,解压xxl-job-2.3.1.zip
-
使用IDEA打开解压后的目录

-
xxl-job-admin:调度中心
-
xxl-job-core:公共依赖
-
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
- xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
- xxl-job-executor-sample-frameless:无框架版本;
-
doc :文档资料,包含数据库脚本
-
在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库
-
使用虚拟机执行
docker start xxl-job-admin
自动启动xxl-job调度中心 -
账号和密码:admin/123456
-
如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
执行器
-
下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。
- 下边进入调度中心添加执行器
- 点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
- 首先在媒资管理模块的service工程添加依赖,在项目的父工程已约定了版本2.3.1
1 2 3 4
<dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency>
- 在nacos下的media-service-dev.yaml下配置xxl-job
1 2 3 4 5 6 7 8 9 10 11 12
xxl: job: admin: addresses: http://192.168.101.65:8088/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
- 配置xxl-job的执行器
- 将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
-
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
-
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功
- 同时观察调度中心中的执行器界面,在线机器地址处已显示1个执行器。
执行任务
-
下边编写任务,参考示例工程中任务类的编写方法。
-
在媒资服务service包下新建jobhandler存放任务类,下边参考示例工程编写一个任务类
|
|
-
下边在调度中心添加任务,进入任务管理
-
点击新增,填写任务信息
-
调度类型:
-
固定速度指按固定的间隔定时调度。
-
Cron,通过Cron表达式实现更丰富的定时调度策略。
-
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
-
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
-
xxl-job提供图形界面去配置
-
-
-
运行模式:有BEAN和GLUE
- bean模式较常用就是在项目工程中编写执行器的任务代码
- GLUE是将任务代码编写在调度中心。
-
JobHandler即任务方法名,填写任务方法上@XxlJob注解中的名称。
-
路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
-
-
添加成功,启动任务
-
通过调度日志查看任务执行情况
-
下边启动媒资管理的service工程,启动执行器。
-
观察执行器方法的执行。
-
如果要停止任务需要在调度中心操作
-
任务跑一段时间注意清理日志
分片广播
- 掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
- 执行器在集群部署下调度中心有哪些路由策略呢?
- 查看xxl-job官方文档,阅读高级配置相关的内容:
|
|
- 下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3…,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。
-
每个执行器收到调度请求同时接收分片参数。
-
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
-
作业分片适用哪些场景呢?
-
分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
-
广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
-
-
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
-
使用说明:
- “分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
-
Java语言任务获取分片参数方式:
- BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler”:
|
|
视频处理技术方案
作业分片方案
-
掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
-
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?
-
XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。
-
下图表示了多个执行器获取视频处理任务的结构:
-
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上分片总数,如果等于分片序号则执行此任务。
-
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
- 1 % 2 = 1 执行器2执行
- 2 % 2 = 0 执行器1执行
- 3 % 2 = 1 执行器2执行
- 以此类推
保证任务不重复执行
-
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
-
首先配置调度过期策略:
-
查看文档如下:
-
调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
-
忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
-
立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
-
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。
-
-
-
其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。
-
查看文档如下:
-
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
-
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
-
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
-
这里如果选择覆盖之前调度则可能重复执行任务,这里选择丢弃后续调度或单机串行方式来避免任务重复执行。
-
-
只做这些配置可以保证任务不会重复执行吗?
-
做不到,还需要保证任务处理的幂等性。
-
什么是幂等性?
-
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
-
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
-
解决幂等性常用的方案:
- 数据库约束,比如:唯一索引,主键。
- 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
- 唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
-
基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
视频处理方案
- 确定了分片方案,下边梳理整个视频上传及处理的业务流程。
-
上传视频成功向视频处理待处理表添加记录。
-
视频处理的详细流程如下:
- 任务调度中心广播作业分片。
- 执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
- 执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
- 执行器启动多线程去处理任务。
- 任务处理完成,上传处理后的视频到MinIO。
- 将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
查询待处理任务
需求分析
-
查询待处理任务只处理未提交及处理失败的任务,任务处理失败后进行重试,最多重试3次。
-
任务处理成功将待处理记录移动到历史任务表。
-
下图是待处理任务表:
- 历史任务表与待处理任务表的结构相同。
添加待处理任务
- 上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
- 根据MIME Type去判断是否是avi视频,下边列出部分MIME Type
Video Type | Extension | MIME Type |
---|---|---|
MPEG-4 Video | .mp4 | video/mp4 |
QuickTime Video | .mov | video/quicktime |
AVI Video | .avi | video/x-msvideo |
WMV Video | .wmv | video/x-ms-wmv |
Flash Video | .flv | video/x-flv |
Ogg Video | .ogv | video/ogg |
WebM Video | .webm | video/webm |
3GP Video | .3gp | video/3gpp |
3G2 Video | .3g2 | video/3gpp2 |
AVCHD Video | .mts | video/avchd |
MPEG Video | .mpe | video/mpeg |
DVD Video | .vob | video/dvd |
-
avi视频的MIME Type是video/x-msvideo
-
修改文件信息入库方法,如下:
|
|
- 进行前后端测试,上传4个avi视频,观察待处理任务表是否存在记录,记录是否完成。
查询待处理任务
-
如何保证查询到的待处理视频记录不重复?
-
编写根据分片参数获取待处理任务的DAO方法,在
MediaProcessMapper
中定义DAO接口如下:
|
|
- 定义Service接口,查询待处理
|
|
- service接口实现
|
|
开始执行任务
分布式锁
-
前边分析了保证任务不重复执行的方案,理论上每个执行器分到的任务是不重复的,但是当在执行器弹性扩容时无法绝对避免任务不重复执行,比如:原来有四个执行器正在执行任务,由于网络问题原有的0、1号执行器无法与调度中心通信,调度中心就会对执行器重新编号,原来的3、4执行器可能就会执行和0、1号执行器相同的任务。
-
为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决,如下代码:
|
|
- synchronized只能保证同一个虚拟机中多个线程去争抢锁。
-
如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。
-
现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:
-
虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。
-
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
-
实现分布式锁的方案有很多,常用的如下:
-
基于数据库实现分布锁
- 利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
-
基于redis实现锁
-
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
-
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
-
-
使用zookeeper实现
- zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
-
-
本次选用数据库实现分布锁,后边的模块会选用其它方案到时再详细介绍。
开启任务
-
下边基于数据库方式实现分布锁,开始执行任务将任务执行状态更新为4表示任务执行中。
-
下边的sql语句可以实现更新操作:
|
|
-
如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。
-
使用乐观锁方式实现更新操作:
|
|
-
多个线程同时执行上边的sql只会有一个线程执行成功。
-
什么是乐观锁、悲观锁?
-
synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
-
乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。
-
数据库的乐观锁实现方式是在表中增加一个version字段,更新时判断是否等于某个版本,等于则更新否则更新失败,如下方式。
-
|
|
-
实现如下:
-
在MediaProcessMapper中定义mapper
|
|
- 在MediaFileProcessService中定义接口
|
|
更新任务状态
-
任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。
-
在MediaFileProcessService接口添加方法
|
|
- service接口方法实现如下
|
|
视频处理
-
视频采用并发处理,每个视频使用一个线程去处理,每次处理的视频数量不要超过cpu核心数。
-
所有视频处理完成结束本次执行,为防止代码异常出现无限期等待则添加超时设置,到达超时时间还没有处理完成仍结束任务。
-
定义任务类VideoTask 如下:
|
|
测试
基本测试
-
进入xxl-job调度中心添加执行器和视频处理任务
-
在xxl-job配置任务调度策略:
- 配置阻塞处理策略为:丢弃后续调度。
- 配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。
-
配置完成开始测试视频处理:
- 首先上传至少4个视频,非mp4格式。
- 在xxl-job启动视频处理任务
- 观察媒资管理服务后台日志
失败测试
- 先停止调度中心的视频处理任务。
- 上传视频,手动修改待处理任务表中file_path字段为一个不存在的文件地址
- 启动任务
- 观察任务处理失败后是否会重试,并记录失败次数。
抢占任务测试
- 修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”
- 在抢占任务代码处打断点并选择支持多线程方式
- 在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。
- 启动任务
- 此时多个线程执行都停留在断点处
- 依次放行,观察同一个任务只会被一个线程抢占成功。
其他问题
任务补偿机制
-
如果有线程抢占了某个视频的处理任务,如果线程处理过程中挂掉了,该视频的状态将会一直是处理中,其它线程将无法处理,这个问题需要用补偿机制。
-
单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务,将任务的状态改为执行失败。
-
任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。
达到最大失败次数
- 当任务达到最大失败次数时一般就说明程序处理此视频存在问题,这种情况就需要人工处理,在页面上会提示失败的信息,人工可手动执行该视频进行处理,或通过其它转码工具进行视频转码,转码后直接上传mp4视频。
分块文件清理问题
- 上传一个文件进行分块上传,上传一半不传了,之前上传到minio的分块文件要清理吗?怎么做的?
- 在数据库中有一张文件表记录minio中存储的文件信息。
- 文件开始上传时会写入文件表,状态为上传中,上传完成会更新状态为上传完成。
- 当一个文件传了一半不再上传了说明该文件没有上传完成,会有定时任务去查询文件表中的记录,如果文件未上传完成则删除minio中没有上传成功的文件目录。
绑定媒资
需求分析
业务流程
-
到目前为止,媒资管理已完成文件上传、视频处理、我的媒资功能等基本功能,其它模块可以使用媒资文件,本节要讲解课程计划绑定媒资文件。
-
如何将课程计划绑定媒资呢?
-
首先进入课程计划界面,然后选择要绑定的视频进行绑定即可。
-
具体的业务流程如下:
- 教育机构用户进入课程管理页面并编辑某一个课程,在"课程大纲"标签页的某一小节后可点击”添加视频“。
- 弹出添加视频对话框,可通过视频关键字搜索已审核通过的视频媒资。
- 选择视频媒资,点击提交按钮,完成课程计划绑定媒资流程。
数据模型
- 课程计划绑定媒资文件后存储至课程计划绑定媒资表
接口定义
-
根据业务流程,用户进入课程计划列表,首先确定向哪个课程计划添加视频,点击”添加视频“后用户选择视频,选择视频,点击提交,前端以json格式请求以下参数:
-
提交媒资文件id、文件名称、教学计划id
-
示例如下:
-
|
|
-
此接口在内容管理模块提供。
-
在内容管理模块定义请求参数模型类型:
|
|
- 在
TeachplanController
类中定义接口如下:
|
|
接口开发
DAO开发
- 使用teachplanMedia表自动生成的Mapper即可。
Service开发
- 在
TeachplanService
中根据需求定义service接口
|
|
- 定义接口实现
|
|
接口层完善
- 完善接口层调用Service层的代码
|
|
接口测试
- 使用httpclient测试
|
|
-
前后端联调
-
此功能较为简单推荐直接前后端联调
-
向指定课程计划添加视频即可
-
解除绑定媒资
需求分析
业务流程
- 点击已经绑定的视频名称即可解除绑定。
数据模型
- 课程计划接触绑定媒资文件后要删除在课程计划绑定媒资表中的该条数据
接口定义
- 接口定义如下:
|
|
- Controller层接口如下
|
|
接口开发
DAO开发
- 直接使用自动生成的mapper即可
Service开发
- service接口定义
|
|
- service接口实现
|
|
接口层完善
|
|
接口测试
- 开发完成使用httpclient测试、前后端联调,此处直接前后端联调即可
|
|