Compare commits

...

4 Commits

13 changed files with 51 additions and 33 deletions

View File

@ -35,17 +35,17 @@ public class EmailService {
* 处理邮件发送任务根据emails添加到邮件发送记录表中
*
* @param platform 平台编码
* @param dispatchNumber 待处理发送任务列表数量
* @param emailJobId 邮件任务id
* @return: void
* @Author: wanjia
* @Date: 2021/9/15
*/
@Transactional
public void DispatchEmailJobs(String platform, Integer dispatchNumber) {
public void DispatchEmailJobs(String platform, Integer emailJobId) {
//获取指定数量待处理列表
List<EmailJob> emailJobList = new ArrayList<>();
try {
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, dispatchNumber);
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, emailJobId);
} catch (Exception e) {
logger.error("获取未处理邮件任务列表失败:\n" + e);
}
@ -71,17 +71,17 @@ public class EmailService {
* 发送邮件
*
* @param platform 平台编码
* @param sentNumber 一次发送数量
* @param emailJobId 邮件任务id
* @return: void
* @Author: wanjia
* @Date: 2021/9/15
*/
@Transactional
public void sendEmail(String platform, Integer sentNumber) {
public void sendEmail(String platform, Integer emailJobId) {
//获取待发送列表
List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
try {
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, sentNumber);
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, emailJobId);
} catch (Exception e) {
logger.error("获取未发送邮件列表失败:\n" + e);
}

View File

@ -35,9 +35,10 @@ public class EmailJobsListener {
public void messageHandler(String message) {
try {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
Boolean flag = emailJobsService.createEmailJob(newEmailJobVo);
int flag = emailJobsService.createEmailJob(newEmailJobVo);
//if the message is inserted successfully, send a new email-job message to kafka
if (flag){
if (flag > 0){
newEmailJobVo.setId(flag);
kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo));
}
} catch (Exception e) {

View File

@ -26,9 +26,9 @@ public class EmailSentListener {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
//处理邮件任务
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), 1);
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
//发送邮件
emailService.sendEmail(newEmailJobVo.getPlatform(), 100);
emailService.sendEmail(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
}

View File

@ -13,6 +13,8 @@ public class NewEmailJobVo {
//平台编码
private String platform;
private Integer id;
@ApiModelProperty(value = "邮件发送者", required = true)
@NotNull(message = "邮件发送者不能为空")
@Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE)
@ -40,6 +42,14 @@ public class NewEmailJobVo {
this.platform = platform;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getSender() {
return sender;
}

View File

@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("id") Integer emailJobId);
}

View File

@ -22,7 +22,7 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
//批量插入邮件发送任务记录
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("jobId") Integer emailJobId);
//批量更新邮件发送任务记录
int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList);

View File

@ -13,23 +13,23 @@ public interface EmailJobsService extends IService<EmailJob> {
* 添加发送邮件任务
*
* @param newEmailJobVo
* @return: boolean
* @return: int
* @Author: wanjia
* @Date: 2021/9/13
*/
boolean createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
int createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
/**
* 获取所有未处理邮件任务列表
*
* @param platform 平台编码
* @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败
* @param size 列表大小
* @param emailJobId 邮件任务id
* @return: List<EmailJob>
* @Author: wanjia
* @Date: 2021/9/13
*/
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception;
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) throws Exception;
/**
* 更新邮件任务状态

View File

@ -23,12 +23,12 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
*
* @param platform 平台编码
* @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败
* @param size 列表数量
* @param emailJobId 邮件任务id
* @return: List<EmailSendRecord>
* @Author: wanjia
* @Date: 2021/9/15
*/
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception;
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId) throws Exception;
/**
* 变更邮件发送记录状态

View File

@ -15,16 +15,17 @@ import java.util.List;
public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService {
@Override
public boolean createEmailJob(NewEmailJobVo newEmailJobVo) {
public int createEmailJob(NewEmailJobVo newEmailJobVo) {
String platform = newEmailJobVo.getPlatform();
EmailJob emailJob = new EmailJob();
BeanUtils.copyProperties(newEmailJobVo, emailJob);
return baseMapper.insertSelective(platform, emailJob) > 0;
baseMapper.insertSelective(platform, emailJob);
return newEmailJobVo.getId();
}
@Override
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size);
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, emailJobId);
}
@Override

View File

@ -35,7 +35,7 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
}
@Override
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber){
return baseMapper.getRecordsByStatus(platform, status, sentNumber);
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId){
return baseMapper.getRecordsByStatus(platform, status, emailJobId);
}
}

View File

@ -32,7 +32,7 @@
#{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP},
#{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER})
</insert>
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" >
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" useGeneratedKeys="true" keyProperty="id" >
insert into ${platform}_email_jobs
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="record.id != null" >
@ -127,9 +127,6 @@
</update>
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
select * from ${platform}_email_jobs
where dispatched_status = #{dispatchedStatus} order by id desc
<if test="size != null">
limit #{size}
</if>
where dispatched_status = #{dispatchedStatus} and id = #{id}
</select>
</mapper>

View File

@ -121,10 +121,7 @@
select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content
from ${platform}_email_send_records a
left join ${platform}_email_jobs b on a.job_id = b.id
where status = #{status} order by id desc
<if test="size != null">
limit #{size}
</if>
where status = #{status} and job_id = #{jobId}
</select>
<update id="updateEmailSendRecordsBatch" parameterType="list">
update ${platform}_email_send_records

12
pom.xml
View File

@ -79,6 +79,18 @@
<artifactId>spring-boot-starter-mail</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.15.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies>
</project>