|
@@ -0,0 +1,653 @@
|
|
|
+package com.datau.da.service.job.impl;
|
|
|
+
|
|
|
+import com.datau.da.constant.Constant;
|
|
|
+import com.datau.da.dao.TaskDao;
|
|
|
+import com.datau.da.entity.mail.ErrorJobInfo;
|
|
|
+import com.datau.da.entity.remotedb.MdmInfoDatabase;
|
|
|
+import com.datau.da.entity.task.Task;
|
|
|
+import com.datau.da.feign.DaMdmFeign;
|
|
|
+import com.datau.da.job.JobServer;
|
|
|
+import com.datau.da.utils.CommonConfUtil;
|
|
|
+import com.datau.da.utils.ConnectionUtil;
|
|
|
+import com.datau.da.utils.mail.MailUtil;
|
|
|
+import org.apache.commons.lang.time.DateUtils;
|
|
|
+import org.apache.logging.log4j.LogManager;
|
|
|
+import org.apache.logging.log4j.Logger;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileWriter;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.sql.*;
|
|
|
+import java.text.ParseException;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.UUID;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author 许明
|
|
|
+ * on 2019/3/11.
|
|
|
+ * @version 2.0.0
|
|
|
+ */
|
|
|
+@Component
|
|
|
+public class TriggerJob {
|
|
|
+
|
|
|
+ private static final Logger logger = LogManager.getLogger(TriggerJob.class);
|
|
|
+
|
|
|
+ private static final String TMP_FTP_FILE_PATH = CommonConfUtil.getConf("mysql.into.outfile.path");
|
|
|
+ private static final String TMP_FTP_FILE_SPLIT = CommonConfUtil.getConf("mysql.into.outfile.split");
|
|
|
+
|
|
|
+ public static TriggerJob triggerJob;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TaskDao taskDao;
|
|
|
+ @Autowired
|
|
|
+ private DaMdmFeign daMdmFeign;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ triggerJob = this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据jobid执行任务
|
|
|
+ *
|
|
|
+ * @param jobId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public String execute(String jobId, Date nextFireTime) throws Exception {
|
|
|
+ Task thisJob = JobServer.MEM_JOB_CACHE.get(jobId);
|
|
|
+ String sendStatus = triggerJob.taskDao.getTaskSendStatusById(thisJob.getId());
|
|
|
+ if (thisJob == null) {
|
|
|
+ insertErrorInfoAndCheckSendStatus(thisJob, "未找到JOB[" + jobId + "]");
|
|
|
+ throw new Exception("未找到JOB[" + jobId + "]");
|
|
|
+ }
|
|
|
+ logger.info("JOB[" + jobId + "]开始进行数据采集任务");
|
|
|
+ //获取源库信息
|
|
|
+ MdmInfoDatabase sourceDatabase = packageMdmInfoDatabase(thisJob.getSourceDatabaseId());
|
|
|
+ MdmInfoDatabase targetDatabase = packageMdmInfoDatabase(thisJob.getTargetDatabaseId());
|
|
|
+ //源库数据库类型名称
|
|
|
+ String sResourceTypeName = sourceDatabase.getResourceTypeName();
|
|
|
+ //目标库数据库类型名称
|
|
|
+ String tResourceTypeName = targetDatabase.getResourceTypeName();
|
|
|
+ //开始执行装载任务
|
|
|
+
|
|
|
+ String result = "";
|
|
|
+ result = startGatherJob(sourceDatabase, targetDatabase, thisJob, sResourceTypeName, tResourceTypeName, nextFireTime);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private MdmInfoDatabase packageMdmInfoDatabase(String dbId) {
|
|
|
+ MdmInfoDatabase mdmInfoDatabase = triggerJob.daMdmFeign.database(dbId);
|
|
|
+ String resourceTypeName = mdmInfoDatabase.getResourceTypeName().toLowerCase();
|
|
|
+ String ip = mdmInfoDatabase.getIp();
|
|
|
+ String port = mdmInfoDatabase.getResourcePort();
|
|
|
+ String dbName = mdmInfoDatabase.getDatabaseName();
|
|
|
+ String url = "";
|
|
|
+ switch (resourceTypeName) {
|
|
|
+ case "mysql":
|
|
|
+ //jdbc:mysql://localhost:3306/sqltestdb
|
|
|
+ url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
|
|
|
+ break;
|
|
|
+ case "hive":
|
|
|
+ //jdbc:hive2://192.168.31.243:10000/default
|
|
|
+ url = "jdbc:hive2://" + ip + ":" + port + "/" + dbName;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ url = "jdbc:mysql://" + ip + ":" + port + "/" + dbName;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ mdmInfoDatabase.setUrl(url);
|
|
|
+ return mdmInfoDatabase;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行采集任务
|
|
|
+ * @param sourceDatabase
|
|
|
+ * @param targetDatabase
|
|
|
+ * @param thisJob
|
|
|
+ * @param sResourceTypeName
|
|
|
+ * @param tResourceTypeName
|
|
|
+ * @param nextFireTime
|
|
|
+ * @return
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private String startGatherJob(MdmInfoDatabase sourceDatabase, MdmInfoDatabase targetDatabase, Task thisJob, String sResourceTypeName,
|
|
|
+ String tResourceTypeName, Date nextFireTime) throws Exception{
|
|
|
+ String fileName = "";
|
|
|
+ //获取源库数据库连接
|
|
|
+ Connection sourceConn = null;
|
|
|
+ Connection targetConn = null;
|
|
|
+ String result = "0";
|
|
|
+ String jobid = thisJob.getId();
|
|
|
+ String sql = thisJob.getDdlSql();
|
|
|
+ String targetTableName = thisJob.getTargetTableName();
|
|
|
+ String nextRunDate = formatNextRunDate(thisJob.getPeriodType(), nextFireTime);
|
|
|
+ String runStatus = "0";
|
|
|
+ try {
|
|
|
+ runStatus = "2";//运行中
|
|
|
+ triggerJob.taskDao.updateTaskRunStatusById(jobid, runStatus, nextRunDate);
|
|
|
+
|
|
|
+ sourceConn = ConnectionUtil.initConn(sourceDatabase);
|
|
|
+ // 创建statement类对象,用来执行SQL语句!!
|
|
|
+ Statement sourceStatement = sourceConn.createStatement();
|
|
|
+ if(sql.contains("'#(")){
|
|
|
+ String[] splitSql = sql.split("'#\\(");
|
|
|
+ for (int i = 0; i < splitSql.length; i++) {
|
|
|
+ String date = "";
|
|
|
+ if(i == 0){
|
|
|
+ String tmp = sql.substring(splitSql[0].length() + 3);
|
|
|
+ date = sql.substring(splitSql[0].length() + 3).substring(0, tmp.indexOf(")"));
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ if(sql.contains("'#(")){
|
|
|
+ String aa = sql.substring(sql.indexOf("#") + 2);
|
|
|
+ date = sql.substring(sql.indexOf("#") + 2).substring(0, aa.indexOf(")"));
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+// date = date.replaceAll("-", "").replaceAll(" ", "");
|
|
|
+ sql = formatSql(thisJob, date, sql);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(sql.contains("'#(")){
|
|
|
+ logger.error("DDL语句中日期通配符配置有问题,请检查!" + sql);
|
|
|
+ insertErrorInfoAndCheckSendStatus(thisJob, "DDL语句中日期通配符配置有问题,请检查!" + sql);
|
|
|
+ result = "-1";
|
|
|
+ throw new Exception("DDL语句中日期通配符配置有问题,请检查!");
|
|
|
+ }
|
|
|
+ logger.info("JOB[" + jobid + "]执行DDL语句为:" + sql);
|
|
|
+ ResultSet rs = sourceStatement.executeQuery(sql);
|
|
|
+
|
|
|
+ String tmpFilePath = System.getProperty("user.dir") + TMP_FTP_FILE_PATH;
|
|
|
+ if (StringUtils.isEmpty(tmpFilePath)) {
|
|
|
+ tmpFilePath = TMP_FTP_FILE_PATH + "/";
|
|
|
+ }
|
|
|
+ String outFileName = tmpFilePath + "/mysql_tmp_file_" + UUID.randomUUID() + ".txt";
|
|
|
+
|
|
|
+ // TODO: 2019/3/15 后续可以考虑自定义输出文件名及路径
|
|
|
+ logger.info("JOB[" + jobid + "]开始导出数据到本地" + outFileName + "文件.....");
|
|
|
+ //数据输出到本地文件
|
|
|
+ InputStream dataStream = writeToLocalFile(tmpFilePath, outFileName, rs);
|
|
|
+ logger.info("JOB[" + jobid + "]数据导出到本地" + outFileName + "文件结束.....");
|
|
|
+ sourceConn.close();
|
|
|
+
|
|
|
+ runStatus = "3";//运行完成
|
|
|
+ result = "0";
|
|
|
+ logger.info("获取目标库数据库连接...");
|
|
|
+ targetConn = ConnectionUtil.initConn(targetDatabase);
|
|
|
+
|
|
|
+ //拼接装载语句
|
|
|
+ String loadSql = formatDDLSql(tResourceTypeName.toLowerCase(), outFileName, sql, targetTableName);
|
|
|
+ PreparedStatement targetStatement = targetConn.prepareStatement(loadSql);
|
|
|
+
|
|
|
+// Statement targetStatement = (com.mysql.jdbc.Statement)targetConn.createStatement();
|
|
|
+ if(dataStream != null){
|
|
|
+// ((com.mysql.jdbc.Statement) targetStatement).setLocalInfileInputStream(dataStream);
|
|
|
+
|
|
|
+ logger.info("JOB[" + jobid + "]拼接装载语句[" + loadSql + "]");
|
|
|
+ File file = new File(outFileName);
|
|
|
+ fileName = outFileName;
|
|
|
+ if (file.exists()){
|
|
|
+ logger.info("JOB[" + jobid + "]开始执行导入文本文件到目标库语句:[" + loadSql + "]");
|
|
|
+ if (targetStatement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
|
|
|
+ com.mysql.jdbc.PreparedStatement mysqlStatement = targetStatement.unwrap(com.mysql.jdbc.PreparedStatement.class);
|
|
|
+ mysqlStatement.setLocalInfileInputStream(dataStream);
|
|
|
+ mysqlStatement.executeUpdate();
|
|
|
+ }
|
|
|
+// targetStatement.exe();
|
|
|
+ logger.info("JOB[" + jobid + "]执行导入文本文件到目标库语句:[" + loadSql + "] 结束");
|
|
|
+ logger.info(outFileName + "数据文件装载成功");
|
|
|
+ // 更新数据库中任务状态为运行完成
|
|
|
+ runStatus = "3";//运行完成
|
|
|
+ result = "0";
|
|
|
+
|
|
|
+ // TODO: 2019/3/15 后续可能会添加是否删除文件的选项,需要进行判断
|
|
|
+ file.delete();
|
|
|
+ } else {
|
|
|
+ insertErrorInfoAndCheckSendStatus(thisJob, "JOB[" + jobid + "]未找到导出文件[" + outFileName + "],请检查!");
|
|
|
+ logger.error("JOB[" + jobid + "]未找到导出文件[" + outFileName + "],请检查!");
|
|
|
+ runStatus = "4";//运行失败
|
|
|
+ result = "-1";
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ triggerJob.taskDao.updateTaskRunStatusById(jobid, runStatus, nextRunDate);
|
|
|
+ triggerJob.taskDao.updateSendedStatusById(jobid, "0");
|
|
|
+ targetConn.close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ runStatus = "4";//运行失败
|
|
|
+ result = "-1";
|
|
|
+ logger.info("执行调度异常,信息为:" + e.getMessage());
|
|
|
+ try {
|
|
|
+ insertErrorInfoAndCheckSendStatus(thisJob, e.getMessage());
|
|
|
+ } catch (Exception e1){
|
|
|
+ e1.getStackTrace();
|
|
|
+ }
|
|
|
+ triggerJob.taskDao.updateTaskRunStatusById(jobid, runStatus, nextRunDate);
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ if (sourceConn != null) {
|
|
|
+ sourceConn.close();
|
|
|
+ }
|
|
|
+ if (targetConn != null) {
|
|
|
+ targetConn.close();
|
|
|
+ }
|
|
|
+ // TODO: 2019/3/15 后续可能会添加是否删除文件的选项,需要进行判断
|
|
|
+ File file = new File(fileName);
|
|
|
+ if(file.exists()){
|
|
|
+ file.delete();
|
|
|
+ }
|
|
|
+ } catch (SQLException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ try {
|
|
|
+ insertErrorInfoAndCheckSendStatus(thisJob, e.getMessage());
|
|
|
+ } catch (Exception e1){
|
|
|
+ e1.getStackTrace();
|
|
|
+ }
|
|
|
+ logger.info("关闭数据源连接或删除临时文件异常,信息为:" + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 数据输出到本地文件
|
|
|
+ * @param localFilePath
|
|
|
+ * @param rs
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ private static InputStream writeToLocalFile(String tmpFilePath, String localFilePath, ResultSet rs) throws Exception{
|
|
|
+ File file = new File(tmpFilePath);
|
|
|
+ if(!file.exists()){
|
|
|
+ logger.info("新增目录[" + tmpFilePath + "]");
|
|
|
+ file.mkdirs();
|
|
|
+ }
|
|
|
+ ResultSetMetaData data = rs.getMetaData();
|
|
|
+ FileWriter fw = new FileWriter(localFilePath);
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ while(rs.next()){
|
|
|
+ for (int i = 1; i <= data.getColumnCount(); i++) {
|
|
|
+ String columnClassName = data.getColumnClassName(i);
|
|
|
+ if(i == (data.getColumnCount())){
|
|
|
+ if("java.lang.Integer".equals(columnClassName) || "java.lang.Double".equals(columnClassName)){
|
|
|
+ if(rs.getString(i) == null || "null".equals(rs.getString(i)) || "".equals(rs.getString(i))){
|
|
|
+ fw.write("\"" + "0" + "\"");
|
|
|
+ builder.append("\"" + "0" + "\"");
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ fw.write("\"" + rs.getString(i) + "\"");
|
|
|
+ builder.append("\"" + rs.getString(i) + "\"");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ fw.write("\"" + (rs.getString(i)==null ? "\\N" : rs.getString(i)) + "\"");
|
|
|
+ builder.append("\"" + (rs.getString(i)==null ? "\\N" : rs.getString(i)) + "\"");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if("java.lang.Integer".equals(columnClassName) || "java.lang.Double".equals(columnClassName)){
|
|
|
+ if(rs.getString(i) == null || "null".equals(rs.getString(i)) || "".equals(rs.getString(i))){
|
|
|
+ fw.write("\""+ "0" + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ builder.append("\"" + "0" + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ fw.write("\"" + rs.getString(i) + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ builder.append("\"" + rs.getString(i) + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ fw.write("\"" + (rs.getString(i)==null ? "\\N" : rs.getString(i)) + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ builder.append("\"" + (rs.getString(i)==null ? "\\N" : rs.getString(i)) + "\"" + TMP_FTP_FILE_SPLIT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ fw.write(System.lineSeparator());
|
|
|
+ builder.append(System.lineSeparator());
|
|
|
+ }
|
|
|
+ fw.close();
|
|
|
+ byte[] bytes = builder.toString().getBytes("GBK");
|
|
|
+ InputStream inputStream = null;
|
|
|
+ if(bytes.length != 0){
|
|
|
+ inputStream = new ByteArrayInputStream(bytes);
|
|
|
+ }
|
|
|
+ return inputStream;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 封装ddl语句用于数据导出和装载
|
|
|
+ *
|
|
|
+ * @param sourceTypeName
|
|
|
+ * @param fileName
|
|
|
+ * @param sql
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String formatDDLSql(String sourceTypeName, String fileName, String sql, String targetTableName) {
|
|
|
+ String loadSql = "";
|
|
|
+ sql = sql.toLowerCase().trim();
|
|
|
+ String loadColumn = sql.substring(6, sql.indexOf("from")).replaceAll(" ", "");
|
|
|
+ //如果sql中查询字段为*,则装载时不指定字段,mysql顺序写入
|
|
|
+ //如果指定了字段,则需要添加具体入库字段,并忽略其他不入库的字段 @dummy
|
|
|
+ loadColumn = "*".equals(loadColumn) ? "" : "(" + loadColumn + ",@dummy)";
|
|
|
+ if ("mysql".equals(sourceTypeName.toLowerCase())) {
|
|
|
+ logger.info("当前数据库类型是mysql...");
|
|
|
+ loadSql = "LOAD DATA LOCAL INFILE '" + fileName
|
|
|
+ + "' REPLACE INTO TABLE " + targetTableName + " character set gbk fields ENCLOSED BY '\"'" +
|
|
|
+ " terminated by '" + TMP_FTP_FILE_SPLIT + "' LINES TERMINATED BY '"+ System.lineSeparator() +"' " + loadColumn;
|
|
|
+ }
|
|
|
+ if ("hive".equals(sourceTypeName.toLowerCase())) {
|
|
|
+ logger.info("当前数据库类型是hive...");
|
|
|
+ }
|
|
|
+ return loadSql;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据调度周期类型替换抽取sql的日期
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String formatSql(Task task, String date, String sql) throws Exception{
|
|
|
+ int periodType = task.getPeriodType();
|
|
|
+ SimpleDateFormat format = new SimpleDateFormat();
|
|
|
+ int offset = task.getScheduleOffset();
|
|
|
+ int num = 0;
|
|
|
+ Date realDate = null;
|
|
|
+ String tmpFormat = "";
|
|
|
+ switch (periodType)
|
|
|
+ {
|
|
|
+ case Constant.JOB_PERIOD_CUSTOM:
|
|
|
+ String dataformat = getTaskDataFormat(task.getTaskCron());
|
|
|
+ format.applyPattern(dataformat);
|
|
|
+ String tmpformat = dataformat.toUpperCase().replaceAll("-", "").replaceAll(" ", "").replaceAll(":", "");
|
|
|
+ String period = tmpformat.substring(tmpformat.length() - 1);
|
|
|
+ num = getOffsetByPeriod(date, period);
|
|
|
+ realDate = getRealDateByDataFormat(tmpformat, offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, dataformat.toUpperCase(), num, period);
|
|
|
+// tmpFormat = tmpFormat.replaceAll("-", "").replaceAll(" ", "").replaceAll(":", "");
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_MONTH:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_MONTH);
|
|
|
+ num = getOffsetByPeriod(date, "M");
|
|
|
+ realDate = DateUtils.addMonths(new Date(), offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, "YYYYMM", num, "M");
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_DAY:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_DAY);
|
|
|
+ num = getOffsetByPeriod(date, "D");
|
|
|
+ realDate = DateUtils.addDays(new Date(), offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, "YYYYMMDD", num, "D");
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_HOUR:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_HOUR);
|
|
|
+ num = getOffsetByPeriod(date, "H");
|
|
|
+ realDate = DateUtils.addHours(new Date(), offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, "YYYYMMDDHH", num, "H");
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_MINUTE:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_MINUTE);
|
|
|
+ num = getOffsetByPeriod(date, "M");
|
|
|
+ realDate = DateUtils.addMinutes(new Date(), offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, "YYYYMMDDHHMM", num, "M");
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_SECONDS:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_SECONDS);
|
|
|
+ num = getOffsetByPeriod(date, "S");
|
|
|
+ realDate = DateUtils.addSeconds(new Date(), offset + num);
|
|
|
+// tmpFormat = getReplaceStrByPeriod(date, "YYYYMMDDHHMMSS", num, "S");
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return sql.replaceAll("#\\(" + date + "\\)", format.format(realDate));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param job 一个JOB任务
|
|
|
+ * @return Date:按 不同周期类型格式化后的当前时间
|
|
|
+ * @description 获取当前JOB的下次运行时间的参照时间点
|
|
|
+ */
|
|
|
+ private static Date getReference(Task job) {
|
|
|
+ Date current = null;
|
|
|
+ try {
|
|
|
+ job.setDataFormat(getDataFormatByPeriodType(job.getPeriodType()));
|
|
|
+ SimpleDateFormat format = new SimpleDateFormat(job.getDataFormat());
|
|
|
+ String current4format = format.format(new Date());
|
|
|
+ current = format.parse(current4format);
|
|
|
+ } catch (ParseException e) {
|
|
|
+ try{
|
|
|
+ insertErrorInfoAndCheckSendStatus(job, "JOB[" + job.getId() + "]数据日期和数据日期格式不匹配!" + e.getMessage());
|
|
|
+ }catch (Exception e1){
|
|
|
+ e1.getStackTrace();
|
|
|
+ }
|
|
|
+ logger.error("JOB[" + job.getId() + "]数据日期和数据日期格式不匹配!", e);
|
|
|
+ }
|
|
|
+ return current;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getDataFormatByPeriodType(int periodType){
|
|
|
+ String dataFormat = "";
|
|
|
+ switch (periodType){
|
|
|
+ case 1:
|
|
|
+ dataFormat = "yyyyMM";
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ dataFormat = "yyyyMMdd";
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
+ dataFormat = "yyyyMMddHH";
|
|
|
+ break;
|
|
|
+ case 4:
|
|
|
+ dataFormat = "yyyyMMddHHmm";
|
|
|
+ break;
|
|
|
+ case 5:
|
|
|
+ dataFormat = "yyyyMMddHHmmss";
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ dataFormat = "yyyyMMddHHmmss";
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return dataFormat;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据周期类型获取通配便宜值
|
|
|
+ * @param date
|
|
|
+ * @param period
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static int getOffsetByPeriod(String date, String period) throws Exception{
|
|
|
+ date = date.toUpperCase();
|
|
|
+ int num = 0;
|
|
|
+ try{
|
|
|
+ if (date.contains("_-")){
|
|
|
+ num = Integer.valueOf(date.substring(date.indexOf("_") + 2, date.lastIndexOf(period)));
|
|
|
+ return num;
|
|
|
+ }
|
|
|
+ else if(date.contains("_")){
|
|
|
+ num = Integer.valueOf(date.substring(date.indexOf("_") + 1, date.lastIndexOf(period)));
|
|
|
+ return 0 - num;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception e){
|
|
|
+ logger.error("当前调度周期类型为:["+ period + "],通配符和当前周期类型不匹配!");
|
|
|
+ throw new Exception("当前调度周期类型为:["+ period + "],通配符和当前周期类型不匹配!");
|
|
|
+ }
|
|
|
+ return num;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据周期类型获取通配便宜值
|
|
|
+ * @param date
|
|
|
+ * @param period
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static String getReplaceStrByPeriod(String date, String dateFormat, int num, String period){
|
|
|
+ num = num < 0 ? Math.abs(num) : num;
|
|
|
+ date = date.toUpperCase();
|
|
|
+ if (date.contains("_-")){
|
|
|
+ dateFormat = dateFormat + "_-" + num + period;
|
|
|
+ return dateFormat;
|
|
|
+ }
|
|
|
+ if(date.contains("_")){
|
|
|
+ dateFormat = dateFormat + "_" + num + period;
|
|
|
+ return dateFormat;
|
|
|
+ }
|
|
|
+ return dateFormat;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据日期格式和调度偏移量,确认实际替换的日期
|
|
|
+ * @param dataformat
|
|
|
+ * @param offset
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static Date getRealDateByDataFormat(String dataformat, int offset){
|
|
|
+ Date realDate = null;
|
|
|
+ switch (dataformat){
|
|
|
+ case "YYYY":
|
|
|
+ realDate = DateUtils.addYears(new Date(), offset);
|
|
|
+ break;
|
|
|
+ case "YYYYMM":
|
|
|
+ realDate = DateUtils.addMonths(new Date(), offset);
|
|
|
+ break;
|
|
|
+ case "YYYYMMDD":
|
|
|
+ realDate = DateUtils.addDays(new Date(), offset);
|
|
|
+ break;
|
|
|
+ case "YYYYMMDDHH":
|
|
|
+ realDate = DateUtils.addHours(new Date(), offset);
|
|
|
+ break;
|
|
|
+ case "YYYYMMDDHHMM":
|
|
|
+ realDate = DateUtils.addMinutes(new Date(), offset);
|
|
|
+ break;
|
|
|
+ case "YYYYMMDDHHMMSS":
|
|
|
+ realDate = DateUtils.addSeconds(new Date(), offset);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ realDate = DateUtils.addSeconds(new Date(), offset);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return realDate;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String formatNextRunDate(int periodType, Date nextFireTime){
|
|
|
+ if(nextFireTime != null){
|
|
|
+ SimpleDateFormat format = new SimpleDateFormat();
|
|
|
+ switch (periodType)
|
|
|
+ {
|
|
|
+ case Constant.JOB_PERIOD_CUSTOM:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_SECONDS);
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_MONTH:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_MONTH);
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_DAY:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_DAY);
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_HOUR:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_HOUR);
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_MINUTE:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_MINUTE);
|
|
|
+ break;
|
|
|
+ case Constant.JOB_PERIOD_SECONDS:
|
|
|
+ format.applyPattern(Constant.RE_FORMAT_SECONDS);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return format.format(nextFireTime);
|
|
|
+ }
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getTaskDataFormat(String taskCron) throws Exception{
|
|
|
+ String[] crons = taskCron.split(" ");
|
|
|
+ int count = -1;
|
|
|
+ String dataFormat = "";
|
|
|
+ for (int i = 0; i < crons.length; i++) {
|
|
|
+ count ++;
|
|
|
+ if("*".equals(crons[i]) || crons[i].contains("*/")){
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ switch (count){
|
|
|
+ case 1:
|
|
|
+ dataFormat = Constant.RE_FORMAT_MINUTE;
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ dataFormat = Constant.RE_FORMAT_HOUR;
|
|
|
+ break;
|
|
|
+ case 3:
|
|
|
+ dataFormat = Constant.RE_FORMAT_DAY;
|
|
|
+ break;
|
|
|
+ case 4:
|
|
|
+ dataFormat = Constant.RE_FORMAT_MONTH;
|
|
|
+ break;
|
|
|
+ case 5:
|
|
|
+ dataFormat = Constant.RE_FORMAT_YEAR;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ dataFormat = Constant.RE_FORMAT_SECONDS;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return dataFormat;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取出错任务信息
|
|
|
+ * @param taskId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public static ErrorJobInfo getErrorJobInfoById(String taskId){
|
|
|
+ return triggerJob.taskDao.getErrorJobInfoById(taskId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 更新出错任务的邮件发送状态
|
|
|
+ * @param taskId
|
|
|
+ * @param sendStatus
|
|
|
+ */
|
|
|
+ public static void updateSendedStatusById(String taskId, String sendStatus){
|
|
|
+ triggerJob.taskDao.updateSendedStatusById(taskId, sendStatus);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 写入出错信息并校验是否需要发送邮件
|
|
|
+ * @param task
|
|
|
+ * @param errorInfo
|
|
|
+ */
|
|
|
+ private static void insertErrorInfoAndCheckSendStatus(Task task, String errorInfo) throws Exception{
|
|
|
+ String sendStatus = triggerJob.taskDao.getTaskSendStatusById(task.getId());
|
|
|
+ ErrorJobInfo errorJobInfo = new ErrorJobInfo();
|
|
|
+ errorJobInfo.setTaskId(task.getId());
|
|
|
+ errorJobInfo.setTaskName(task.getTaskName());
|
|
|
+ errorJobInfo.setErrorDesc(errorInfo);
|
|
|
+ errorJobInfo.setSendStatus(sendStatus);
|
|
|
+ triggerJob.taskDao.deleteTaskErrorInfo(task.getId());
|
|
|
+ triggerJob.taskDao.insertTaskErrorInfo(errorJobInfo);
|
|
|
+ if("0".equals(sendStatus)){
|
|
|
+ MailUtil.sendErrorTaskInfo(task.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static String getSendMailsConfig() throws Exception{
|
|
|
+ List<String> sendMails = triggerJob.taskDao.getSendMailsConfig();
|
|
|
+ if(sendMails.isEmpty()){
|
|
|
+ throw new Exception("错误信息需要发送的邮箱地址未配置!");
|
|
|
+ }
|
|
|
+ String mailTo = "";
|
|
|
+ for(String mail : sendMails){
|
|
|
+ if(mail.contains(";")){
|
|
|
+ throw new Exception("要发送的邮箱地址配置不正确!");
|
|
|
+ }
|
|
|
+ mailTo += mail + ";";
|
|
|
+ }
|
|
|
+ return mailTo;
|
|
|
+ }
|
|
|
+}
|