Compare commits

..

4 Commits

28 changed files with 242 additions and 291 deletions

View File

@ -33,9 +33,6 @@ cp middleware/.env.example middleware/.env
cp reader/src/main/resources/application.yml.example reader/src/main/resources/application.yml cp reader/src/main/resources/application.yml.example reader/src/main/resources/application.yml
cp writer/src/main/resources/application.yml.example writer/src/main/resources/application.yml cp writer/src/main/resources/application.yml.example writer/src/main/resources/application.yml
cp executor/src/main/resources/application.yml.example executor/src/main/resources/application.yml cp executor/src/main/resources/application.yml.example executor/src/main/resources/application.yml
cp reader/src/main/resources/mail.properties.example reader/src/main/resources/mail.properties
cp writer/src/main/resources/mail.properties.example writer/src/main/resources/mail.properties
cp executor/src/main/resources/mail.properties.example executor/src/main/resources/mail.properties
``` ```
5. 修改 `{repo}/middleware/.env` 文件里 `SQL_SCRIPT_PATH``DOCKER_DATA_PATH` 绝对路径到本地磁盘 5. 修改 `{repo}/middleware/.env` 文件里 `SQL_SCRIPT_PATH``DOCKER_DATA_PATH` 绝对路径到本地磁盘

View File

@ -13,12 +13,10 @@ public class NotificationSystemConstant {
//平台编码 //平台编码
public static final String PLATFORM_CODE_GITLINK = "gitlink"; //gitlink平台 public static final String PLATFORM_CODE_GITLINK = "gitlink"; //gitlink平台
public static final String PLATFORM_CODE_HEHUI = "hehui"; //hehui平台 public static final String PLATFORM_CODE_HEHUI = "hehui"; //hehui平台
public static final String PLATFORM_CODE_OSREDM = "osredm"; //红山开源平台
public static final Map<String, String> PLATFORM_CODE_MAP = new HashMap<String, String>() { public static final Map<String, String> PLATFORM_CODE_MAP = new HashMap<String, String>() {
{ {
put("gitlink", PLATFORM_CODE_GITLINK); put("gitlink", PLATFORM_CODE_GITLINK);
put("hehui", PLATFORM_CODE_HEHUI); put("hehui", PLATFORM_CODE_HEHUI);
put("osredm", PLATFORM_CODE_OSREDM);
} }
}; };

View File

@ -23,9 +23,4 @@ CREATE TABLE `gitlink_email_send_records` (
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `index_on_email_and_status` (`email`,`status`), KEY `index_on_email_and_status` (`email`,`status`),
KEY `index_on_status` (`status`) KEY `index_on_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-- 2023-01-05 更新字符集编码
ALTER TABLE gitlink_email_jobs CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE gitlink_email_send_records CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@ -35,7 +35,4 @@ ALTER TABLE gitlink_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFA
-- 2021-09-09 新增 source 字段区分消息来源、新增 extra 字段保存额外信息 -- 2021-09-09 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
ALTER TABLE gitlink_sys_notification ADD source varchar(250) NULL COMMENT '消息来源'; ALTER TABLE gitlink_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
ALTER TABLE gitlink_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)'; ALTER TABLE gitlink_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
-- 2023-01-05 更新字符集编码
ALTER TABLE gitlink_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@ -24,7 +24,4 @@ ALTER TABLE hehui_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFAUL
-- 2021-09-10 新增 source 字段区分消息来源、新增 extra 字段保存额外信息 -- 2021-09-10 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
ALTER TABLE hehui_sys_notification ADD source varchar(250) NULL COMMENT '消息来源'; ALTER TABLE hehui_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
ALTER TABLE hehui_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)'; ALTER TABLE hehui_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
-- 2023-01-05 更新字符集编码
ALTER TABLE hehui_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@ -1,30 +0,0 @@
USE gitlink_notification;
-- 2022-02-14 新增红山平台
INSERT INTO gns_platform_info(platform_code,platform_name,created_at,is_delete) VALUES('osredm','红山平台',NOW(),-1);
DROP TABLE IF EXISTS `osredm_sys_notification`;
CREATE TABLE `osredm_sys_notification` (
`id` INT NOT NULL AUTO_INCREMENT,
`sender` INT(11) NOT NULL COMMENT '发送者id',
`receiver` INT(11) NOT NULL COMMENT '接受者id',
`content` TEXT NOT NULL COMMENT '消息内容:富文本',
`notification_url` VARCHAR(2000) DEFAULT NULL COMMENT '消息跳转链接',
`created_at` DATETIME NOT NULL DEFAULT NOW() COMMENT '创建时间',
`status` TINYINT(4) NOT NULL DEFAULT 1 COMMENT '已读状态: 1未读2已读',
`is_delete` TINYINT(1) NOT NULL DEFAULT '-1' COMMENT '是否删除: -1未删除1已删除',
PRIMARY KEY (`id`),
KEY `index_on_receiver_and_status` (`receiver`,`status`),
KEY `index_on_status` (`status`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3;
-- 2021-09-10 区分系统消息类型
ALTER TABLE osredm_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFAULT 1 COMMENT '消息类型: 1系统消息2@我');
-- 2021-09-10 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
ALTER TABLE osredm_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
ALTER TABLE osredm_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
-- 2023-01-05 更新字符集编码
ALTER TABLE osredm_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@ -35,17 +35,17 @@ public class EmailService {
* 处理邮件发送任务根据emails添加到邮件发送记录表中 * 处理邮件发送任务根据emails添加到邮件发送记录表中
* *
* @param platform 平台编码 * @param platform 平台编码
* @param dispatchNumber 待处理发送任务列表数量 * @param emailJobId 邮件任务id
* @return: void * @return: void
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
@Transactional @Transactional
public void DispatchEmailJobs(String platform, Integer dispatchNumber) { public void DispatchEmailJobs(String platform, Integer emailJobId) {
//获取指定数量待处理列表 //获取指定数量待处理列表
List<EmailJob> emailJobList = new ArrayList<>(); List<EmailJob> emailJobList = new ArrayList<>();
try { try {
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, dispatchNumber); emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, emailJobId);
} catch (Exception e) { } catch (Exception e) {
logger.error("获取未处理邮件任务列表失败:\n" + e); logger.error("获取未处理邮件任务列表失败:\n" + e);
} }
@ -71,17 +71,17 @@ public class EmailService {
* 发送邮件 * 发送邮件
* *
* @param platform 平台编码 * @param platform 平台编码
* @param sentNumber 一次发送数量 * @param emailJobId 邮件任务id
* @return: void * @return: void
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
@Transactional @Transactional
public void sendEmail(String platform, Integer sentNumber) { public void sendEmail(String platform, Integer emailJobId) {
//获取待发送列表 //获取待发送列表
List<EmailSendRecord> emailSendRecordList = new ArrayList<>(); List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
try { try {
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, sentNumber); emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, emailJobId);
} catch (Exception e) { } catch (Exception e) {
logger.error("获取未发送邮件列表失败:\n" + e); logger.error("获取未发送邮件列表失败:\n" + e);
} }
@ -95,8 +95,7 @@ public class EmailService {
flag = true; flag = true;
unSentEmailSendRecord.setSentAt(new Date()); unSentEmailSendRecord.setSentAt(new Date());
unSentEmailSendRecord.setStatus(flag ? NotificationSystemConstant.EMAIL_SENT_SUCCESS : NotificationSystemConstant.EMAIL_SENT_FAIL); unSentEmailSendRecord.setStatus(flag ? NotificationSystemConstant.EMAIL_SENT_SUCCESS : NotificationSystemConstant.EMAIL_SENT_FAIL);
} catch (Exception e) { } catch (MessagingException e) {
unSentEmailSendRecord.setStatus(NotificationSystemConstant.EMAIL_SENT_FAIL);
logger.error("发送邮件失败email: " + unSentEmailSendRecord.getEmail() + "\n" + e); logger.error("发送邮件失败email: " + unSentEmailSendRecord.getEmail() + "\n" + e);
} }
} }

View File

@ -35,9 +35,10 @@ public class EmailJobsListener {
public void messageHandler(String message) { public void messageHandler(String message) {
try { try {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class); NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
Boolean flag = emailJobsService.createEmailJob(newEmailJobVo); int flag = emailJobsService.createEmailJob(newEmailJobVo);
//if the message is inserted successfully, send a new email-job message to kafka //if the message is inserted successfully, send a new email-job message to kafka
if (flag){ if (flag > 0){
newEmailJobVo.setId(flag);
kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo)); kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo));
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -26,9 +26,9 @@ public class EmailSentListener {
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class); NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
//处理邮件任务 //处理邮件任务
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), 1); emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
//发送邮件 //发送邮件
emailService.sendEmail(newEmailJobVo.getPlatform(), 100); emailService.sendEmail(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
} }

0
middleware/end_docker_compose.sh Normal file → Executable file
View File

View File

@ -1,181 +1,172 @@
version: '3' version: '3'
services: services:
mysql: mysql:
image: mysql:${MYSQL_VERSION} image: mysql:${MYSQL_VERSION}
container_name: ${MYSQL_CONTAINER_NAME} container_name: ${MYSQL_CONTAINER_NAME}
hostname: mysql hostname: mysql
environment: environment:
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD} - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD}
- MYSQL_USER=${MYSQL_USER} - MYSQL_USER=${MYSQL_USER}
- MYSQL_PASSWORD=${MYSQL_PASSWORD} - MYSQL_PASSWORD=${MYSQL_PASSWORD}
- MYSQL_DATABASE=${MYSQL_DATABASE} - MYSQL_DATABASE=${MYSQL_DATABASE}
- TZ=Asia/Shanghai - TZ=Asia/Shanghai
volumes: volumes:
- ${DOCKER_DATA_PATH}/mysql:/var/lib/mysql - ${DOCKER_DATA_PATH}/mysql:/var/lib/mysql
- ${SQL_SCRIPT_PATH}/gns-notification.sql:/docker-entrypoint-initdb.d/0001.sql - ${SQL_SCRIPT_PATH}/gns-notification.sql:/docker-entrypoint-initdb.d/0001.sql
- ${SQL_SCRIPT_PATH}/hehui-gns-notification.sql:/docker-entrypoint-initdb.d/0002.sql - ${SQL_SCRIPT_PATH}/hehui-gns-notification.sql:/docker-entrypoint-initdb.d/0002.sql
- ${SQL_SCRIPT_PATH}/gns-email.sql:/docker-entrypoint-initdb.d/0003.sql - ${SQL_SCRIPT_PATH}/gns-email.sql:/docker-entrypoint-initdb.d/0003.sql
- ${SQL_SCRIPT_PATH}/osredm-gns-notification.sql:/docker-entrypoint-initdb.d/0004.sql command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci ports:
ports: - ${MYSQL_LOCAL_PORT}:3306
- ${MYSQL_LOCAL_PORT}:3306 networks:
networks: - gitlink_network
- gitlink_network
restart: always redis:
image: redis:${REDIS_VERSION}
redis: container_name: ${REDIS_CONTAINER_NAME}
image: redis:${REDIS_VERSION} hostname: redis
container_name: ${REDIS_CONTAINER_NAME} volumes:
hostname: redis - ${DOCKER_DATA_PATH}/redis/data:/data
volumes: - ${DOCKER_DATA_PATH}/redis/logs:/logs
- ${DOCKER_DATA_PATH}/redis/data:/data environment:
- ${DOCKER_DATA_PATH}/redis/logs:/logs - TZ=Asia/Shanghai
environment: ports:
- TZ=Asia/Shanghai - ${REDIS_LOCAL_PORT}:6379
ports: networks:
- ${REDIS_LOCAL_PORT}:6379 - gitlink_network
networks:
- gitlink_network # See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
restart: always zookeeper:
image: confluentinc/cp-zookeeper:latest
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper container_name: ${ZOOKEEPER_CONTAINER_NAME}
zookeeper: hostname: zookeeper
image: confluentinc/cp-zookeeper:latest environment:
container_name: ${ZOOKEEPER_CONTAINER_NAME} ZOOKEEPER_CLIENT_PORT: 2181
hostname: zookeeper ZOOKEEPER_TICK_TIME: 2000
environment: ports:
ZOOKEEPER_CLIENT_PORT: 2181 - ${ZOOKEEPER_LOCAL_PORT}:2181
ZOOKEEPER_TICK_TIME: 2000 # volumes:
ports: # - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
- ${ZOOKEEPER_LOCAL_PORT}:2181 networks:
# volumes: - gitlink_network
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
networks: # See Also: https://hub.docker.com/r/confluentinc/cp-kafka
- gitlink_network kafka1:
restart: always image: confluentinc/cp-kafka:latest
container_name: ${KAFKA_CONTAINER_01_NAME}
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka hostname: kafka1
kafka1: depends_on:
image: confluentinc/cp-kafka:latest - zookeeper
container_name: ${KAFKA_CONTAINER_01_NAME} ports:
hostname: kafka1 - ${KAFKA_01_LOCAL_PORT}:29092
depends_on: # volumes:
- zookeeper # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/lib:/var/lib/kafka
ports: # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/logs:/var/logs/kafka
- ${KAFKA_01_LOCAL_PORT}:29092 # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/conf:/etc/kafka
# volumes: environment:
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/lib:/var/lib/kafka KAFKA_BROKER_ID: 1
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/logs:/var/logs/kafka KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/conf:/etc/kafka KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT}
environment: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_BROKER_ID: 1 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT} networks:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - gitlink_network
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # See Also: https://hub.docker.com/r/confluentinc/cp-kafka
networks: kafka2:
- gitlink_network image: confluentinc/cp-kafka:latest
restart: always container_name: ${KAFKA_CONTAINER_02_NAME}
hostname: kafka2
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka depends_on:
kafka2: - zookeeper
image: confluentinc/cp-kafka:latest ports:
container_name: ${KAFKA_CONTAINER_02_NAME} - ${KAFKA_02_LOCAL_PORT}:39092
hostname: kafka2 # volumes:
depends_on: # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/lib:/var/lib/kafka
- zookeeper # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/logs:/var/logs/kafka
ports: # - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/conf:/etc/kafka
- ${KAFKA_02_LOCAL_PORT}:39092 environment:
# volumes: KAFKA_BROKER_ID: 2
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/lib:/var/lib/kafka KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/logs:/var/logs/kafka KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT}
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/conf:/etc/kafka KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
environment: KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 networks:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT} - gitlink_network
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT gitlink-reader:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 container_name: ${GNS_READER_CONTAINER_NAME}
networks: hostname: gitlink_reader
- gitlink_network image: gitlink/gns-reader:${GITLINK_NOTIFICATION_SYS_VERSION}
restart: always build:
context: ../
gitlink-reader: dockerfile: middleware/reader.Dockerfile
container_name: ${GNS_READER_CONTAINER_NAME} networks:
hostname: gitlink_reader - gitlink_network
image: gitlink/gns-reader:${GITLINK_NOTIFICATION_SYS_VERSION} environment:
build: - TZ=Asia/Shanghai
context: ../ volumes:
dockerfile: middleware/reader.Dockerfile - ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
networks: depends_on:
- gitlink_network - kafka1
environment: - kafka2
- TZ=Asia/Shanghai - redis
volumes: - mysql
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/ ports:
depends_on: - ${GNS_READER_LOCAL_PORT}:8081
- kafka1
- kafka2 gitlink-writer:
- redis container_name: ${GNS_WRITER_CONTAINER_NAME}
- mysql hostname: gitlink_writer
ports: image: gitlink/gns-writer:${GITLINK_NOTIFICATION_SYS_VERSION}
- ${GNS_READER_LOCAL_PORT}:8081 build:
restart: always context: ../
dockerfile: middleware/writer.Dockerfile
gitlink-writer: networks:
container_name: ${GNS_WRITER_CONTAINER_NAME} - gitlink_network
hostname: gitlink_writer environment:
image: gitlink/gns-writer:${GITLINK_NOTIFICATION_SYS_VERSION} - TZ=Asia/Shanghai
build: volumes:
context: ../ - ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
dockerfile: middleware/writer.Dockerfile depends_on:
networks: - kafka1
- gitlink_network - kafka2
environment: - redis
- TZ=Asia/Shanghai - mysql
volumes: ports:
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/ - ${GNS_WRITER_LOCAL_PORT}:8082
depends_on:
- kafka1 gitlink-executor:
- kafka2 container_name: ${GNS_EXECUTOR_CONTAINER_NAME}
- redis hostname: gitlink_executor
- mysql image: gitlink/gns-executor:${GITLINK_NOTIFICATION_SYS_VERSION}
ports: build:
- ${GNS_WRITER_LOCAL_PORT}:8082 context: ../
restart: always dockerfile: middleware/executor.Dockerfile
networks:
gitlink-executor: - gitlink_network
container_name: ${GNS_EXECUTOR_CONTAINER_NAME} volumes:
hostname: gitlink_executor - ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
image: gitlink/gns-executor:${GITLINK_NOTIFICATION_SYS_VERSION} environment:
build: - TZ=Asia/Shanghai
context: ../ depends_on:
dockerfile: middleware/executor.Dockerfile - kafka1
networks: - kafka2
- gitlink_network - redis
volumes: - mysql
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/ ports:
environment: - ${GNS_EXECUTOR_LOCAL_PORT}:8083
- TZ=Asia/Shanghai
depends_on: networks:
- kafka1 gitlink_network:
- kafka2 driver: bridge
- redis name: gitlink_network
- mysql driver_opts:
ports: com.docker.network.enable_ipv6: "true"
- ${GNS_EXECUTOR_LOCAL_PORT}:8083
restart: always
networks:
gitlink_network:
driver: bridge
name: gitlink_network
driver_opts:
com.docker.network.enable_ipv6: "true"

0
middleware/start_docker_compose.sh Normal file → Executable file
View File

View File

@ -13,6 +13,8 @@ public class NewEmailJobVo {
//平台编码 //平台编码
private String platform; private String platform;
private Integer id;
@ApiModelProperty(value = "邮件发送者", required = true) @ApiModelProperty(value = "邮件发送者", required = true)
@NotNull(message = "邮件发送者不能为空") @NotNull(message = "邮件发送者不能为空")
@Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE) @Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE)
@ -40,6 +42,14 @@ public class NewEmailJobVo {
this.platform = platform; this.platform = platform;
} }
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getSender() { public Integer getSender() {
return sender; return sender;
} }

View File

@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record); int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size); List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("id") Integer emailJobId);
} }

View File

@ -22,7 +22,7 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
//批量插入邮件发送任务记录 //批量插入邮件发送任务记录
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList); int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber); List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("jobId") Integer emailJobId);
//批量更新邮件发送任务记录 //批量更新邮件发送任务记录
int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList); int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList);

View File

@ -39,7 +39,6 @@ public interface SysNotificationMapper extends BaseMapper<SysNotification> {
List<SysNotification> getSysNotificationPageList(Page page, String orderBy, List<SysNotification> getSysNotificationPageList(Page page, String orderBy,
@Param("type") int type, @Param("type") int type,
@Param("sources") String sources,
@Param("platform") String platform, @Param("platform") String platform,
@Param("receiver") Integer receiver, @Param("receiver") Integer receiver,
@Param("status") Integer status); @Param("status") Integer status);

View File

@ -13,23 +13,23 @@ public interface EmailJobsService extends IService<EmailJob> {
* 添加发送邮件任务 * 添加发送邮件任务
* *
* @param newEmailJobVo * @param newEmailJobVo
* @return: boolean * @return: int
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/13 * @Date: 2021/9/13
*/ */
boolean createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception; int createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
/** /**
* 获取所有未处理邮件任务列表 * 获取所有未处理邮件任务列表
* *
* @param platform 平台编码 * @param platform 平台编码
* @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败 * @param dispatchedStatus 发送状态-1 未处理,1 处理成功,2 处理失败
* @param size 列表大小 * @param emailJobId 邮件任务id
* @return: List<EmailJob> * @return: List<EmailJob>
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/13 * @Date: 2021/9/13
*/ */
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception; List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) throws Exception;
/** /**
* 更新邮件任务状态 * 更新邮件任务状态

View File

@ -23,12 +23,12 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
* *
* @param platform 平台编码 * @param platform 平台编码
* @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败 * @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败
* @param size 列表数量 * @param emailJobId 邮件任务id
* @return: List<EmailSendRecord> * @return: List<EmailSendRecord>
* @Author: wanjia * @Author: wanjia
* @Date: 2021/9/15 * @Date: 2021/9/15
*/ */
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception; List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId) throws Exception;
/** /**
* 变更邮件发送记录状态 * 变更邮件发送记录状态

View File

@ -36,17 +36,16 @@ public interface SysNotificationService extends IService<SysNotification> {
/** /**
* 获取消息列表 * 获取消息列表
* *
* @param type 类型 -1 全部 1 系统消息2 @我
* @param page 页码
* @param size 页大小
* @param platform 平台编号 * @param platform 平台编号
* @param receiver 消息接收者 * @param receiver 消息接收者
* @param status 状态 -1 全部1 未读 2 已读 * @param status 状态 -1 全部1 未读 2 已读
* @param type 类型 -1 全部 1 系统消息2 @我
* @param sources
* @param page 页码
* @param size 页大小
* @return * @return
*/ */
Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, String sources, Integer page, Integer size) throws Exception; Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, Integer page, Integer size) throws Exception;
/** /**
* @Description: 批量删除系统消息 * @Description: 批量删除系统消息

View File

@ -15,16 +15,17 @@ import java.util.List;
public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService { public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService {
@Override @Override
public boolean createEmailJob(NewEmailJobVo newEmailJobVo) { public int createEmailJob(NewEmailJobVo newEmailJobVo) {
String platform = newEmailJobVo.getPlatform(); String platform = newEmailJobVo.getPlatform();
EmailJob emailJob = new EmailJob(); EmailJob emailJob = new EmailJob();
BeanUtils.copyProperties(newEmailJobVo, emailJob); BeanUtils.copyProperties(newEmailJobVo, emailJob);
return baseMapper.insertSelective(platform, emailJob) > 0; baseMapper.insertSelective(platform, emailJob);
return newEmailJobVo.getId();
} }
@Override @Override
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) { public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) {
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size); return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, emailJobId);
} }
@Override @Override

View File

@ -35,7 +35,7 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
} }
@Override @Override
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber){ public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId){
return baseMapper.getRecordsByStatus(platform, status, sentNumber); return baseMapper.getRecordsByStatus(platform, status, emailJobId);
} }
} }

View File

@ -28,7 +28,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
@Override @Override
@Transactional @Transactional
public boolean sendNotification(NewSysNotificationVo newSysNotificationVo) { public boolean sendNotification(NewSysNotificationVo newSysNotificationVo) throws Exception {
List<SysNotification> sysNotificationList = new ArrayList<>(); List<SysNotification> sysNotificationList = new ArrayList<>();
List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(",")); List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(","));
for (String receiver : list) { for (String receiver : list) {
@ -51,7 +51,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
} }
@Override @Override
public int markNotificationAs(String platform, Integer receiver, String notificationIds, Integer status, Integer type) { public int markNotificationAs(String platform, Integer receiver, String notificationIds, Integer status, Integer type) throws Exception {
int count = baseMapper.updateStatusByNotificationId(platform, receiver, notificationIds, status, type); int count = baseMapper.updateStatusByNotificationId(platform, receiver, notificationIds, status, type);
if (count > 0) { if (count > 0) {
this.delUserCache(platform, receiver); this.delUserCache(platform, receiver);
@ -60,7 +60,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
} }
@Override @Override
public int getNotificationCount(String platform, Integer receiver, Integer type, Integer status) { public int getNotificationCount(String platform, Integer receiver, Integer type, Integer status) throws Exception {
String cacheKey = cacheKeyForCount(platform, receiver, type, status); String cacheKey = cacheKeyForCount(platform, receiver, type, status);
Object foundResult = this.redisUtil.get(cacheKey); Object foundResult = this.redisUtil.get(cacheKey);
if (foundResult != null) { if (foundResult != null) {
@ -73,8 +73,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
@Override @Override
public Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, String sources, Integer page, Integer size) { public Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, Integer page, Integer size) throws Exception {
String cacheKey = cacheKeyForPage(platform, receiver, type, sources, status, page, size); String cacheKey = cacheKeyForPage(platform, receiver, type, status, page, size);
Object foundResult = this.redisUtil.get(cacheKey); Object foundResult = this.redisUtil.get(cacheKey);
if (foundResult != null) { if (foundResult != null) {
return (Page<SysNotification>) foundResult; return (Page<SysNotification>) foundResult;
@ -82,7 +82,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
Page<SysNotification> pageItem = new Page<SysNotification>(page, size); Page<SysNotification> pageItem = new Page<SysNotification>(page, size);
List<SysNotification> sysNotificationList = baseMapper.getSysNotificationPageList( List<SysNotification> sysNotificationList = baseMapper.getSysNotificationPageList(
pageItem, "", type, sources, platform, receiver, status pageItem, "", type, platform, receiver, status
); );
pageItem.setRecords(sysNotificationList); pageItem.setRecords(sysNotificationList);
this.redisUtil.set(cacheKey, pageItem); this.redisUtil.set(cacheKey, pageItem);
@ -90,7 +90,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
} }
@Override @Override
public int deleteNotifications(String platform, Integer receiver, String notificationIds, Integer type) { public int deleteNotifications(String platform, Integer receiver, String notificationIds, Integer type) throws Exception {
int count = baseMapper.deleteNotificationByIds(platform, receiver, notificationIds, type); int count = baseMapper.deleteNotificationByIds(platform, receiver, notificationIds, type);
if (count > 0) { if (count > 0) {
this.delUserCache(platform, receiver); this.delUserCache(platform, receiver);
@ -107,8 +107,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
return String.format("%s#T%s#S%s#Count", cachePrefixForPlatform(platform, receiver), type, status); return String.format("%s#T%s#S%s#Count", cachePrefixForPlatform(platform, receiver), type, status);
} }
private static String cacheKeyForPage(String platform, Integer receiver, Integer type, String sources, Integer status, Integer page, Integer size) { private static String cacheKeyForPage(String platform, Integer receiver, Integer type, Integer status, Integer page, Integer size) {
return String.format("%s#T%s#%s#S%s#P%s_S%s", cachePrefixForPlatform(platform, receiver), type, sources, status, page, size); return String.format("%s#T%s#S%s#P%s_S%s", cachePrefixForPlatform(platform, receiver), type, status, page, size);
} }
private static String cachePrefixForPlatform(String platform, Integer receiver) { private static String cachePrefixForPlatform(String platform, Integer receiver) {

View File

@ -32,7 +32,7 @@
#{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP}, #{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP},
#{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER}) #{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER})
</insert> </insert>
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" > <insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" useGeneratedKeys="true" keyProperty="id" >
insert into ${platform}_email_jobs insert into ${platform}_email_jobs
<trim prefix="(" suffix=")" suffixOverrides="," > <trim prefix="(" suffix=")" suffixOverrides="," >
<if test="record.id != null" > <if test="record.id != null" >
@ -127,9 +127,6 @@
</update> </update>
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap"> <select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
select * from ${platform}_email_jobs select * from ${platform}_email_jobs
where dispatched_status = #{dispatchedStatus} order by id desc where dispatched_status = #{dispatchedStatus} and id = #{id}
<if test="size != null">
limit #{size}
</if>
</select> </select>
</mapper> </mapper>

View File

@ -121,10 +121,7 @@
select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content
from ${platform}_email_send_records a from ${platform}_email_send_records a
left join ${platform}_email_jobs b on a.job_id = b.id left join ${platform}_email_jobs b on a.job_id = b.id
where status = #{status} order by id desc where status = #{status} and job_id = #{jobId}
<if test="size != null">
limit #{size}
</if>
</select> </select>
<update id="updateEmailSendRecordsBatch" parameterType="list"> <update id="updateEmailSendRecordsBatch" parameterType="list">
update ${platform}_email_send_records update ${platform}_email_send_records

View File

@ -202,12 +202,6 @@
<if test="status != -1"> <if test="status != -1">
and status = #{status} and status = #{status}
</if> </if>
<if test="sources != null and sources != ''">
and source in
<foreach collection="sources.split(',')" item="sources" open="(" separator="," close=")">
#{sources}
</foreach>
</if>
ORDER BY id DESC ORDER BY id DESC
</select> </select>
<update id="deleteNotificationByIds"> <update id="deleteNotificationByIds">

View File

@ -28,7 +28,7 @@ public class ServiceTests {
@Test @Test
public void testSysNotificationService() throws Exception { public void testSysNotificationService() throws Exception {
int i = sysNotificationService.getNotificationCount("gitlink", 234,1,1); int i = sysNotificationService.getNotificationCount("gitlink", 234,1,1);
Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1, "IssueChanged", 1,20); Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1,1,20);
NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo(); NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
newSysNotificationVo.setSender(1); newSysNotificationVo.setSender(1);
newSysNotificationVo.setReceivers("7,8"); newSysNotificationVo.setReceivers("7,8");

12
pom.xml
View File

@ -79,6 +79,18 @@
<artifactId>spring-boot-starter-mail</artifactId> <artifactId>spring-boot-starter-mail</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.15.0</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.15.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -52,9 +52,6 @@ public class NotificationController {
@ApiParam(value = "消息类型:值为-1时获取全部信息值为1时获取系统消息值为2时获取@我消息", defaultValue = "-1") @ApiParam(value = "消息类型:值为-1时获取全部信息值为1时获取系统消息值为2时获取@我消息", defaultValue = "-1")
@RequestParam(name = "type", required = false, defaultValue = "-1") Integer type, @RequestParam(name = "type", required = false, defaultValue = "-1") Integer type,
@ApiParam(value = "消息来源")
@RequestParam(name = "sources", required = false) String sources,
@ApiParam(value = "页码:值为-1时默认值不开启分页", required = false, defaultValue = "-1") @ApiParam(value = "页码:值为-1时默认值不开启分页", required = false, defaultValue = "-1")
@RequestParam(name = "page", required = false, defaultValue = "-1") Integer page, @RequestParam(name = "page", required = false, defaultValue = "-1") Integer page,
@ -91,7 +88,7 @@ public class NotificationController {
} }
//分页的数据 //分页的数据
Page foundPage = notificationService.getNotification(platform, receiver, status, type, sources, page, size); Page foundPage = notificationService.getNotification(platform, receiver, status, type, page, size);
if (foundPage != null) { if (foundPage != null) {
notificationListVo.setPageNum(foundPage.getCurrent()); notificationListVo.setPageNum(foundPage.getCurrent());
notificationListVo.setPageSize(foundPage.getSize()); notificationListVo.setPageSize(foundPage.getSize());