实现Quartz.NET的HTTP作业调度
Quartz.NET作为一个开源的作业调度库,广泛应用于.NET应用程序中,以实现复杂的定时任务,本次记录利用Quartz.NET实现HTTP作业调度,通过自定义HTTP作业,实现对外部API的定时调用和如何管理这些作业,包括创建、修改、暂停、恢复和删除作业。
1.首先定义了一个
HttpJob
类,该类实现了
IJob
接口,用于执行HTTP请求。利用了
RestRequest
来构建请求,并通过静态字典
Delegates
存储每个作业的配置信息,如URL、请求方法和请求头等
public classHttpJob : IJob
{public static readonly Dictionary<string, HttpJobInfo> Delegates = new();public asyncTask Execute(IJobExecutionContext context)
{var delegateKey = context.JobDetail.JobDataMap.GetString("delegateKey");if (delegateKey != null && Delegates.TryGetValue(delegateKey, out varfunc))
{var requestBody = newRestRequest();if (func.Headers != null)
{foreach (var header infunc.Headers)
{
requestBody.AddHeader(header.Key, header.Value);
}
}var content =HttpHelper.HttpRequest(func.Url, func.Request, requestBody);
JobLogHelper.AddJobLog(new JobLog() { JobName = context.JobDetail.Key.Name, GroupName = context.JobDetail.Key.Group, RunTime = DateTime.Now, RunResult =content });
UpdateLastExecutionTime(context.JobDetail.Key.Name, context.JobDetail.Key.Group, DateTime.Now);
}awaitTask.CompletedTask;
}
}
2.作业信息的持久化:为了持久化作业信息,定义了
JobInfo
类来存储作业的基本信息,如名称、组名、Cron表达式等,并将这些信息保存在本地的JSON文件中。
public classJobInfo
{public required string JobName { get; set; }public required string GroupName { get; set; }public required string CronExpression { get; set; }public DateTime LastExecutionTime { get; set; }public JobStatus Status { get; set; }public required HttpJobInfo HttpJob { get; set; }
}
3.实现了
QuartzHelper
类,用于管理作业的生命周期。这包括加载作业信息、创建作业、调度作业、暂停/恢复作业以及删除作业等功能。
public classQuartzHelper
{privateIScheduler scheduler;private List<JobInfo>jobInfos;private string filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "jobs.json");/// <summary> ///构造函数,初始化定时任务管理器/// </summary> publicQuartzHelper()
{
ISchedulerFactory schedulerFactory= newStdSchedulerFactory();
scheduler=schedulerFactory.GetScheduler().Result;
scheduler.Start().Wait();
LoadJobInfosApi().Wait();
}/// <summary> ///保存作业信息到本地 JSON 文件/// </summary> private voidSaveJobInfos()
{string json =JsonConvert.SerializeObject(jobInfos);
File.WriteAllText(filePath, json);
}/// <summary> ///加载本地 JSON 文件中的作业信息/// </summary> private asyncTask LoadJobInfosApi()
{if(File.Exists(filePath))
{string json =File.ReadAllText(filePath);
jobInfos= JsonConvert.DeserializeObject<List<JobInfo>>(json) ?? new List<JobInfo>();foreach (var jobInfo injobInfos)
{//创建委托的唯一键 var delegateKey =Guid.NewGuid().ToString();//将委托存储在静态字典中 HttpJob.Delegates[delegateKey] =jobInfo.HttpJob;//创建并调度作业 IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity(jobInfo.JobName, jobInfo.GroupName).UsingJobData("delegateKey", delegateKey) //将委托的键添加到JobDataMap .Build();
ITrigger trigger=TriggerBuilder.Create()
.WithIdentity(jobInfo.JobName, jobInfo.GroupName)
.WithCronSchedule(jobInfo.CronExpression)//.StartNow() .Build();awaitscheduler.ScheduleJob(job, trigger);//根据任务状态恢复或暂停任务 if (jobInfo.Status ==JobStatus.正常运行)
{awaitResumeJob(jobInfo.JobName, jobInfo.GroupName);
}else{awaitPauseJob(jobInfo.JobName, jobInfo.GroupName);
}
}
}else{
jobInfos= new List<JobInfo>();
}
}#region 执行普通任务时使用,传委托时可以参考此方法 /////<summary> ///// 新建任务并立即执行/////</summary> //[Obsolete("执行普通任务时使用,可以传委托使用")]//public async Task AddJob(string jobName, string groupName, string cronExpression, Func<bool> func, string description = "")//{//if (jobInfos.Any(c => c.JobName == jobName && c.GroupName == groupName))//{//return;//}// //创建委托的唯一键//var delegateKey = Guid.NewGuid().ToString();// //将委托存储在静态字典中// //MyJobClass.Delegates[delegateKey] = func;// //创建作业信息并保存到列表 需要将func 加入到jobInfo 中做作业持久化!!!!//var jobInfo = new JobInfo { JobName = jobName, GroupName = groupName, CronExpression = cronExpression, Status = JobStatus.正常运行, Description = description, JobCreateTime = DateTime.Now };//jobInfos.Add(jobInfo);//SaveJobInfos();// //创建Quartz作业和触发器//IJobDetail job = JobBuilder.Create<MyJobClass>()//.WithIdentity(jobName, groupName)//.UsingJobData("delegateKey", delegateKey)//将委托的键添加到JobDataMap//.Build();//ITrigger trigger = TriggerBuilder.Create()//.WithIdentity(jobName + "Trigger", groupName)//.StartNow()//.WithCronSchedule(cronExpression).WithDescription(description)//.Build();//await scheduler.ScheduleJob(job, trigger);//} #endregion /// <summary> ///新建任务并立即执行/// </summary> public async Task AddJobApi(string jobName, string groupName, string cronExpression, HttpJobInfo httpJobInfo, string description = "")
{if (jobInfos.Any(c => c.JobName == jobName && c.GroupName ==groupName))
{return;
}//创建委托的唯一键 var delegateKey =Guid.NewGuid().ToString();//将委托存储在静态字典中 HttpJob.Delegates[delegateKey] =httpJobInfo;//创建作业信息并保存到列表 需要将func 加入到jobInfo 中做作业持久化!!!! var jobInfo = new JobInfo { JobName = jobName, GroupName = groupName, CronExpression = cronExpression, HttpJob = httpJobInfo, Status = JobStatus.正常运行, Description = description, JobCreateTime =DateTime.Now };
jobInfos.Add(jobInfo);
SaveJobInfos();//创建Quartz作业和触发器 IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity(jobName, groupName)
.UsingJobData("delegateKey", delegateKey) //将委托的键添加到JobDataMap .Build();
ITrigger trigger=TriggerBuilder.Create()
.WithIdentity(jobName+ "Trigger", groupName)
.StartNow()
.WithCronSchedule(cronExpression).WithDescription(description)
.Build();awaitscheduler.ScheduleJob(job, trigger);
}/// <summary> ///暂停任务/// </summary> public async Task PauseJob(string jobName, stringgroupName)
{await scheduler.PauseJob(newJobKey(jobName, groupName));var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName ==groupName);if (job != null)
{
job.Status=JobStatus.暂停;
SaveJobInfos();
}
}/// <summary> ///开启任务/// </summary> public async Task ResumeJob(string jobName, stringgroupName)
{await scheduler.ResumeJob(newJobKey(jobName, groupName));var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName ==groupName);if (job != null)
{
job.Status=JobStatus.正常运行;
SaveJobInfos();
}
}/// <summary> ///立即执行任务/// </summary> public async Task TriggerJob(string jobName, stringgroupName)
{await scheduler.TriggerJob(newJobKey(jobName, groupName));var job = jobInfos.FirstOrDefault(j => j.JobName == jobName && j.GroupName ==groupName);if (job != null)
{
job.LastExecutionTime=DateTime.Now;
SaveJobInfos();
}
}/// <summary> ///修改任务/// </summary> public async Task ModifyJob(string jobName, string groupName, string cronExpression, HttpJobInfo httpJobInfo, string description = "")
{awaitDeleteJob(jobName, groupName);awaitAddJobApi(jobName, groupName, cronExpression, httpJobInfo, description);
}/// <summary> ///删除任务/// </summary> public async Task DeleteJob(string jobName, stringgroupName)
{await scheduler.DeleteJob(newJobKey(jobName, groupName));
jobInfos.RemoveAll(j=> j.JobName == jobName && j.GroupName ==groupName);
SaveJobInfos();
}/// <summary> ///获取当前所有任务列表/// </summary> public List<JobInfo>GetAllJobs()
{if(File.Exists(filePath))
{string json =File.ReadAllText(filePath);
jobInfos= JsonConvert.DeserializeObject<List<JobInfo>>(json) ?? new List<JobInfo>();returnjobInfos;
}else return null;
}
}
QuartzHelper
4.为了跟踪作业的执行情况,设计了
JobLog
类和
JobLogHelper
类,用于记录和查询作业执行日志。
public classJobLogHelper
{private static string_filePath;/// <summary> ///根据作业名称和组名称获取当日的作业执行日志/// </summary> /// <param name="jobName"></param> /// <param name="groupName"></param> /// <returns></returns> public static List<JobLog> GetJobLog(string jobName, stringgroupName)
{
_filePath= Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"jobsLog-{DateTime.Now:yyyyMMdd}.json");//检查文件是否存在 if (!File.Exists(_filePath))
{return new List<JobLog>();
}var jsonText = $"[{File.ReadAllText(_filePath)}]";var list = JsonConvert.DeserializeObject<List<JobLog>>(jsonText);if (list != null)
{var result = list.Where(c => c.JobName == jobName && groupName == c.GroupName).OrderByDescending(c =>c.RunTime).ToList();returnresult;
}return null;
}/// <summary> ///获取所有的 作业执行日志 //可以从这里拓展其他查询条件/// </summary> /// <returns></returns> public static List<JobLog>GetAllLogs()
{
List<JobLog> jobLogs = new List<JobLog>();var logFilePaths = Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "jobsLog-*.json");
logFilePaths.ToList().ForEach(c=>{var jsonText = $"[{File.ReadAllText(_filePath)}]";var list = JsonConvert.DeserializeObject<List<JobLog>>(jsonText);if (list != null) jobLogs.AddRange(list);
});returnjobLogs;
}/// <summary> ///添加作业执行日志/// </summary> /// <param name="jobLog"></param> public static voidAddJobLog(JobLog jobLog)
{
_filePath= Path.Combine(AppDomain.CurrentDomain.BaseDirectory, $"jobsLog-{DateTime.Now:yyyyMMdd}.json");string json = JsonConvert.SerializeObject(jobLog) + ",\n";
File.AppendAllText(_filePath, json);
}
}
作业执行日志
5.最后,通过ASP.NET Core的Controller提供了一系列Web API接口,以便于通过HTTP请求管理作业。这些接口包括获取作业列表、添加作业、修改作业、删除作业、暂停作业、恢复作业和立即执行作业等。
[Route("api/[controller]")]
[ApiController]public classQuartzController : ControllerBase
{private readonlyQuartzHelper _quartzHelper;publicQuartzController(QuartzHelper quartzHelper)
{
_quartzHelper=quartzHelper;
}
[HttpGet]
[Route("job/GetJobs")]public objectGetJobs()
{return Ok(new {code=200,data =_quartzHelper.GetAllJobs() });
}
[HttpGet]
[Route("job/GetJobLog")]public object GetJobLog(string jobName, stringgroupName)
{return Ok(new { code = 200, data =JobLogHelper.GetJobLog(jobName, groupName) });
}
[HttpGet]
[Route("job/GetJobLogs")]public objectGetJobLogs()
{return Ok(new { code = 200, data =JobLogHelper.GetAllLogs() });
}
[HttpPost]
[Route("job/AddJob")]public async Task<object>Add(JobInfo jobInfo)
{try{await_quartzHelper.AddJobApi(jobInfo.JobName, jobInfo.GroupName, jobInfo.CronExpression, jobInfo.HttpJob, jobInfo.Description);return Ok(new { code = 200, msg = "创建成功!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
[HttpPost]
[Route("job/ModifyJob")]public async Task<object>Edit(JobInfo jobInfo)
{try{await_quartzHelper.ModifyJob(jobInfo.JobName, jobInfo.GroupName, jobInfo.CronExpression, jobInfo.HttpJob, jobInfo.Description);return Ok(new { code = 200, msg = "修改成功!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
[HttpGet]
[Route("job/DeleteJob")]public async Task<object> Delete(string jobName, stringgroupName)
{try{await_quartzHelper.DeleteJob(jobName, groupName);return Ok(new { code = 200, msg = "删除成功!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
[HttpGet]
[Route("job/PauseJob")]public async Task<object> PauseJob(string jobName, stringgroupName)
{try{await_quartzHelper.PauseJob(jobName, groupName);return Ok(new { code = 200, msg = "暂停成功!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
[HttpGet]
[Route("job/ResumeJob")]public async Task<object> ResumeJob(string jobName, stringgroupName)
{try{await_quartzHelper.ResumeJob(jobName, groupName);return Ok(new { code = 200, msg = "开启任务成功!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
[HttpGet]
[Route("job/TriggerJob")]public async Task<object> TriggerJob(string jobName, stringgroupName)
{try{await_quartzHelper.TriggerJob(jobName, groupName);return Ok(new { code = 200, msg = "立即执行任务命令已执行!"});
}catch(Exception ex)
{return Ok(new { code = 500, msg =ex.Message });
}
}
}
Web API接口
源码地址:https://github.com/yycb1994/Quartz.Net