Compare commits

...

4 Commits

13 changed files with 51 additions and 33 deletions

View File

@ -35,17 +35,17 @@ public class EmailService {
* 处理邮件发送任务根据emails添加到邮件发送记录表中 * 处理邮件发送任务根据emails添加到邮件发送记录表中
* *
* @param platform 平台编码 * @param platform 平台编码
* @param dispatchNumber 待处理发送任务列表数量 * @param emailJobId 邮件任务id
* @return: void * @return: void
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
@Transactional @Transactional
public void DispatchEmailJobs(String platform, Integer dispatchNumber) { public void DispatchEmailJobs(String platform, Integer emailJobId) {
//获取指定数量待处理列表 //获取指定数量待处理列表
List<EmailJob> emailJobList = new ArrayList<>(); List<EmailJob> emailJobList = new ArrayList<>();
try { try {
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, dispatchNumber); emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, emailJobId);
} catch (Exception e) { } catch (Exception e) {
logger.error("获取未处理邮件任务列表失败:\n" + e); logger.error("获取未处理邮件任务列表失败:\n" + e);
} }
@ -71,17 +71,17 @@ public class EmailService {
* 发送邮件 * 发送邮件
* *
* @param platform 平台编码 * @param platform 平台编码
* @param sentNumber 一次发送数量 * @param emailJobId 邮件任务id
* @return: void * @return: void
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
@Transactional @Transactional
public void sendEmail(String platform, Integer sentNumber) { public void sendEmail(String platform, Integer emailJobId) {
//获取待发送列表 //获取待发送列表
List<EmailSendRecord> emailSendRecordList = new ArrayList<>(); List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
try { try {
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, sentNumber); emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, emailJobId);
} catch (Exception e) { } catch (Exception e) {
logger.error("获取未发送邮件列表失败:\n" + e); logger.error("获取未发送邮件列表失败:\n" + e);
} }

View File

@ -35,9 +35,10 @@ public class EmailJobsListener {
public void messageHandler(String message) { public void messageHandler(String message) {
try { try {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class); NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
Boolean flag = emailJobsService.createEmailJob(newEmailJobVo); int flag = emailJobsService.createEmailJob(newEmailJobVo);
//if the message is inserted successfully, send a new email-job message to kafka //if the message is inserted successfully, send a new email-job message to kafka
if (flag){ if (flag > 0){
newEmailJobVo.setId(flag);
kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo)); kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo));
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -26,9 +26,9 @@ public class EmailSentListener {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class); NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
//处理邮件任务 //处理邮件任务
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), 1); emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
//发送邮件 //发送邮件
emailService.sendEmail(newEmailJobVo.getPlatform(), 100); emailService.sendEmail(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
} }

View File

@ -13,6 +13,8 @@ public class NewEmailJobVo {
//平台编码 //平台编码
private String platform; private String platform;
private Integer id;
@ApiModelProperty(value = "邮件发送者", required = true) @ApiModelProperty(value = "邮件发送者", required = true)
@NotNull(message = "邮件发送者不能为空") @NotNull(message = "邮件发送者不能为空")
@Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE) @Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE)
@ -40,6 +42,14 @@ public class NewEmailJobVo {
this.platform = platform; this.platform = platform;
} }
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getSender() { public Integer getSender() {
return sender; return sender;
} }

View File

@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record); int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size); List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("id") Integer emailJobId);
} }

View File

@ -22,7 +22,7 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
//批量插入邮件发送任务记录 //批量插入邮件发送任务记录
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList); int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber); List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("jobId") Integer emailJobId);
//批量更新邮件发送任务记录 //批量更新邮件发送任务记录
int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList); int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList);

View File

@ -13,23 +13,23 @@ public interface EmailJobsService extends IService<EmailJob> {
* 添加发送邮件任务 * 添加发送邮件任务
* *
* @param newEmailJobVo * @param newEmailJobVo
* @return: boolean * @return: int
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/13 * @Date: 2021/9/13
*/ */
boolean createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception; int createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
/** /**
* 获取所有未处理邮件任务列表 * 获取所有未处理邮件任务列表
* *
* @param platform 平台编码 * @param platform 平台编码
* @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败 * @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败
* @param size 列表大小 * @param emailJobId 邮件任务id
* @return: List<EmailJob> * @return: List<EmailJob>
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/13 * @Date: 2021/9/13
*/ */
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception; List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) throws Exception;
/** /**
* 更新邮件任务状态 * 更新邮件任务状态

View File

@ -23,12 +23,12 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
* *
* @param platform 平台编码 * @param platform 平台编码
* @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败 * @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败
* @param size 列表数量 * @param emailJobId 邮件任务id
* @return: List<EmailSendRecord> * @return: List<EmailSendRecord>
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception; List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId) throws Exception;
/** /**
* 变更邮件发送记录状态 * 变更邮件发送记录状态

View File

@ -15,16 +15,17 @@ import java.util.List;
public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService { public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService {
@Override @Override
public boolean createEmailJob(NewEmailJobVo newEmailJobVo) { public int createEmailJob(NewEmailJobVo newEmailJobVo) {
String platform = newEmailJobVo.getPlatform(); String platform = newEmailJobVo.getPlatform();
EmailJob emailJob = new EmailJob(); EmailJob emailJob = new EmailJob();
BeanUtils.copyProperties(newEmailJobVo, emailJob); BeanUtils.copyProperties(newEmailJobVo, emailJob);
return baseMapper.insertSelective(platform, emailJob) > 0; baseMapper.insertSelective(platform, emailJob);
return newEmailJobVo.getId();
} }
@Override @Override
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) { public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size); return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, emailJobId);
} }
@Override @Override

View File

@ -35,7 +35,7 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
} }
@Override @Override
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber){ public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId){
return baseMapper.getRecordsByStatus(platform, status, sentNumber); return baseMapper.getRecordsByStatus(platform, status, emailJobId);
} }
} }

View File

@ -32,7 +32,7 @@
#{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP}, #{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP},
#{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER}) #{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER})
</insert> </insert>
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" > <insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" useGeneratedKeys="true" keyProperty="id" >
insert into ${platform}_email_jobs insert into ${platform}_email_jobs
<trim prefix="(" suffix=")" suffixOverrides="," > <trim prefix="(" suffix=")" suffixOverrides="," >
<if test="record.id != null" > <if test="record.id != null" >
@ -127,9 +127,6 @@
</update> </update>
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap"> <select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
select * from ${platform}_email_jobs select * from ${platform}_email_jobs
where dispatched_status = #{dispatchedStatus} order by id desc where dispatched_status = #{dispatchedStatus} and id = #{id}
<if test="size != null">
limit #{size}
</if>
</select> </select>
</mapper> </mapper>

View File

@ -121,10 +121,7 @@
select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content
from ${platform}_email_send_records a from ${platform}_email_send_records a
left join ${platform}_email_jobs b on a.job_id = b.id left join ${platform}_email_jobs b on a.job_id = b.id
where status = #{status} order by id desc where status = #{status} and job_id = #{jobId}
<if test="size != null">
limit #{size}
</if>
</select> </select>
<update id="updateEmailSendRecordsBatch" parameterType="list"> <update id="updateEmailSendRecordsBatch" parameterType="list">
update ${platform}_email_send_records update ${platform}_email_send_records

12
pom.xml
View File

@ -79,6 +79,18 @@
<artifactId>spring-boot-starter-mail</artifactId> <artifactId>spring-boot-starter-mail</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.15.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>