Compare commits
4 Commits
master
...
dev_gitlin
Author | SHA1 | Date |
---|---|---|
|
6a593ad8f9 | |
|
3fb8ab9015 | |
![]() |
826bc3491e | |
![]() |
65a0c6a3a1 |
|
@ -33,9 +33,6 @@ cp middleware/.env.example middleware/.env
|
|||
cp reader/src/main/resources/application.yml.example reader/src/main/resources/application.yml
|
||||
cp writer/src/main/resources/application.yml.example writer/src/main/resources/application.yml
|
||||
cp executor/src/main/resources/application.yml.example executor/src/main/resources/application.yml
|
||||
cp reader/src/main/resources/mail.properties.example reader/src/main/resources/mail.properties
|
||||
cp writer/src/main/resources/mail.properties.example writer/src/main/resources/mail.properties
|
||||
cp executor/src/main/resources/mail.properties.example executor/src/main/resources/mail.properties
|
||||
```
|
||||
|
||||
5. 修改 `{repo}/middleware/.env` 文件里 `SQL_SCRIPT_PATH` 和 `DOCKER_DATA_PATH` 绝对路径到本地磁盘
|
||||
|
|
|
@ -13,12 +13,10 @@ public class NotificationSystemConstant {
|
|||
//平台编码
|
||||
public static final String PLATFORM_CODE_GITLINK = "gitlink"; //gitlink平台
|
||||
public static final String PLATFORM_CODE_HEHUI = "hehui"; //hehui平台
|
||||
public static final String PLATFORM_CODE_OSREDM = "osredm"; //红山开源平台
|
||||
public static final Map<String, String> PLATFORM_CODE_MAP = new HashMap<String, String>() {
|
||||
{
|
||||
put("gitlink", PLATFORM_CODE_GITLINK);
|
||||
put("hehui", PLATFORM_CODE_HEHUI);
|
||||
put("osredm", PLATFORM_CODE_OSREDM);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -23,9 +23,4 @@ CREATE TABLE `gitlink_email_send_records` (
|
|||
PRIMARY KEY (`id`),
|
||||
KEY `index_on_email_and_status` (`email`,`status`),
|
||||
KEY `index_on_status` (`status`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
|
||||
|
||||
|
||||
-- 2023-01-05 更新字符集编码
|
||||
ALTER TABLE gitlink_email_jobs CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
ALTER TABLE gitlink_email_send_records CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
|
|
@ -35,7 +35,4 @@ ALTER TABLE gitlink_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFA
|
|||
|
||||
-- 2021-09-09 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
|
||||
ALTER TABLE gitlink_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
|
||||
ALTER TABLE gitlink_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
|
||||
|
||||
-- 2023-01-05 更新字符集编码
|
||||
ALTER TABLE gitlink_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
ALTER TABLE gitlink_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
|
|
@ -24,7 +24,4 @@ ALTER TABLE hehui_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFAUL
|
|||
|
||||
-- 2021-09-10 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
|
||||
ALTER TABLE hehui_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
|
||||
ALTER TABLE hehui_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
|
||||
|
||||
-- 2023-01-05 更新字符集编码
|
||||
ALTER TABLE hehui_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
ALTER TABLE hehui_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
|
|
@ -1,30 +0,0 @@
|
|||
|
||||
USE gitlink_notification;
|
||||
|
||||
-- 2022-02-14 新增红山平台
|
||||
INSERT INTO gns_platform_info(platform_code,platform_name,created_at,is_delete) VALUES('osredm','红山平台',NOW(),-1);
|
||||
|
||||
DROP TABLE IF EXISTS `osredm_sys_notification`;
|
||||
CREATE TABLE `osredm_sys_notification` (
|
||||
`id` INT NOT NULL AUTO_INCREMENT,
|
||||
`sender` INT(11) NOT NULL COMMENT '发送者id',
|
||||
`receiver` INT(11) NOT NULL COMMENT '接受者id',
|
||||
`content` TEXT NOT NULL COMMENT '消息内容:富文本',
|
||||
`notification_url` VARCHAR(2000) DEFAULT NULL COMMENT '消息跳转链接',
|
||||
`created_at` DATETIME NOT NULL DEFAULT NOW() COMMENT '创建时间',
|
||||
`status` TINYINT(4) NOT NULL DEFAULT 1 COMMENT '已读状态: 1未读,2已读',
|
||||
`is_delete` TINYINT(1) NOT NULL DEFAULT '-1' COMMENT '是否删除: -1未删除,1已删除',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `index_on_receiver_and_status` (`receiver`,`status`),
|
||||
KEY `index_on_status` (`status`)
|
||||
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3;
|
||||
|
||||
-- 2021-09-10 区分系统消息类型
|
||||
ALTER TABLE osredm_sys_notification ADD COLUMN (`type` TINYINT(4) NOT NULL DEFAULT 1 COMMENT '消息类型: 1系统消息,2@我');
|
||||
|
||||
-- 2021-09-10 新增 source 字段区分消息来源、新增 extra 字段保存额外信息
|
||||
ALTER TABLE osredm_sys_notification ADD source varchar(250) NULL COMMENT '消息来源';
|
||||
ALTER TABLE osredm_sys_notification ADD extra TEXT NULL COMMENT '额外信息(备用字段)';
|
||||
|
||||
-- 2023-01-05 更新字符集编码
|
||||
ALTER TABLE osredm_sys_notification CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
|
@ -35,17 +35,17 @@ public class EmailService {
|
|||
* 处理邮件发送任务,根据emails添加到邮件发送记录表中
|
||||
*
|
||||
* @param platform 平台编码
|
||||
* @param dispatchNumber 待处理发送任务列表数量
|
||||
* @param emailJobId 邮件任务id
|
||||
* @return: void
|
||||
* @Author: wanjia
|
||||
* @Date: 2021/9/15
|
||||
*/
|
||||
@Transactional
|
||||
public void DispatchEmailJobs(String platform, Integer dispatchNumber) {
|
||||
public void DispatchEmailJobs(String platform, Integer emailJobId) {
|
||||
//获取指定数量待处理列表
|
||||
List<EmailJob> emailJobList = new ArrayList<>();
|
||||
try {
|
||||
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, dispatchNumber);
|
||||
emailJobList = emailJobsService.getEmailJobsByDispatchedStatus(platform, NotificationSystemConstant.EMAIL_JOB_NOT_DISPATCHED, emailJobId);
|
||||
} catch (Exception e) {
|
||||
logger.error("获取未处理邮件任务列表失败:\n" + e);
|
||||
}
|
||||
|
@ -71,17 +71,17 @@ public class EmailService {
|
|||
* 发送邮件
|
||||
*
|
||||
* @param platform 平台编码
|
||||
* @param sentNumber 一次发送数量
|
||||
* @param emailJobId 邮件任务id
|
||||
* @return: void
|
||||
* @Author: wanjia
|
||||
* @Date: 2021/9/15
|
||||
*/
|
||||
@Transactional
|
||||
public void sendEmail(String platform, Integer sentNumber) {
|
||||
public void sendEmail(String platform, Integer emailJobId) {
|
||||
//获取待发送列表
|
||||
List<EmailSendRecord> emailSendRecordList = new ArrayList<>();
|
||||
try {
|
||||
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, sentNumber);
|
||||
emailSendRecordList = emailSendRecordsService.getRecordsByStatus(platform, NotificationSystemConstant.EMAIL_UNSENT_RECORD, emailJobId);
|
||||
} catch (Exception e) {
|
||||
logger.error("获取未发送邮件列表失败:\n" + e);
|
||||
}
|
||||
|
@ -95,8 +95,7 @@ public class EmailService {
|
|||
flag = true;
|
||||
unSentEmailSendRecord.setSentAt(new Date());
|
||||
unSentEmailSendRecord.setStatus(flag ? NotificationSystemConstant.EMAIL_SENT_SUCCESS : NotificationSystemConstant.EMAIL_SENT_FAIL);
|
||||
} catch (Exception e) {
|
||||
unSentEmailSendRecord.setStatus(NotificationSystemConstant.EMAIL_SENT_FAIL);
|
||||
} catch (MessagingException e) {
|
||||
logger.error("发送邮件失败,email: " + unSentEmailSendRecord.getEmail() + "\n" + e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,9 +35,10 @@ public class EmailJobsListener {
|
|||
public void messageHandler(String message) {
|
||||
try {
|
||||
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
|
||||
Boolean flag = emailJobsService.createEmailJob(newEmailJobVo);
|
||||
int flag = emailJobsService.createEmailJob(newEmailJobVo);
|
||||
//if the message is inserted successfully, send a new email-job message to kafka
|
||||
if (flag){
|
||||
if (flag > 0){
|
||||
newEmailJobVo.setId(flag);
|
||||
kafkaUtil.sendMessage(gitlinkNewEmailRemindTopic, JSONObject.toJSONString(newEmailJobVo));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -26,9 +26,9 @@ public class EmailSentListener {
|
|||
|
||||
NewEmailJobVo newEmailJobVo = JSONObject.parseObject(message, NewEmailJobVo.class);
|
||||
//处理邮件任务
|
||||
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), 1);
|
||||
emailService.DispatchEmailJobs(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
|
||||
//发送邮件
|
||||
emailService.sendEmail(newEmailJobVo.getPlatform(), 100);
|
||||
emailService.sendEmail(newEmailJobVo.getPlatform(), newEmailJobVo.getId());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,181 +1,172 @@
|
|||
version: '3'
|
||||
services:
|
||||
|
||||
mysql:
|
||||
image: mysql:${MYSQL_VERSION}
|
||||
container_name: ${MYSQL_CONTAINER_NAME}
|
||||
hostname: mysql
|
||||
environment:
|
||||
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD}
|
||||
- MYSQL_USER=${MYSQL_USER}
|
||||
- MYSQL_PASSWORD=${MYSQL_PASSWORD}
|
||||
- MYSQL_DATABASE=${MYSQL_DATABASE}
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/mysql:/var/lib/mysql
|
||||
- ${SQL_SCRIPT_PATH}/gns-notification.sql:/docker-entrypoint-initdb.d/0001.sql
|
||||
- ${SQL_SCRIPT_PATH}/hehui-gns-notification.sql:/docker-entrypoint-initdb.d/0002.sql
|
||||
- ${SQL_SCRIPT_PATH}/gns-email.sql:/docker-entrypoint-initdb.d/0003.sql
|
||||
- ${SQL_SCRIPT_PATH}/osredm-gns-notification.sql:/docker-entrypoint-initdb.d/0004.sql
|
||||
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
|
||||
ports:
|
||||
- ${MYSQL_LOCAL_PORT}:3306
|
||||
networks:
|
||||
- gitlink_network
|
||||
restart: always
|
||||
|
||||
redis:
|
||||
image: redis:${REDIS_VERSION}
|
||||
container_name: ${REDIS_CONTAINER_NAME}
|
||||
hostname: redis
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/redis/data:/data
|
||||
- ${DOCKER_DATA_PATH}/redis/logs:/logs
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
ports:
|
||||
- ${REDIS_LOCAL_PORT}:6379
|
||||
networks:
|
||||
- gitlink_network
|
||||
restart: always
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:latest
|
||||
container_name: ${ZOOKEEPER_CONTAINER_NAME}
|
||||
hostname: zookeeper
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_TICK_TIME: 2000
|
||||
ports:
|
||||
- ${ZOOKEEPER_LOCAL_PORT}:2181
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
|
||||
networks:
|
||||
- gitlink_network
|
||||
restart: always
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
container_name: ${KAFKA_CONTAINER_01_NAME}
|
||||
hostname: kafka1
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_01_LOCAL_PORT}:29092
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/lib:/var/lib/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/logs:/var/logs/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/conf:/etc/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
networks:
|
||||
- gitlink_network
|
||||
restart: always
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka
|
||||
kafka2:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
container_name: ${KAFKA_CONTAINER_02_NAME}
|
||||
hostname: kafka2
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_02_LOCAL_PORT}:39092
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/lib:/var/lib/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/logs:/var/logs/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/conf:/etc/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 2
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
networks:
|
||||
- gitlink_network
|
||||
restart: always
|
||||
|
||||
gitlink-reader:
|
||||
container_name: ${GNS_READER_CONTAINER_NAME}
|
||||
hostname: gitlink_reader
|
||||
image: gitlink/gns-reader:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/reader.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_READER_LOCAL_PORT}:8081
|
||||
restart: always
|
||||
|
||||
gitlink-writer:
|
||||
container_name: ${GNS_WRITER_CONTAINER_NAME}
|
||||
hostname: gitlink_writer
|
||||
image: gitlink/gns-writer:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/writer.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_WRITER_LOCAL_PORT}:8082
|
||||
restart: always
|
||||
|
||||
gitlink-executor:
|
||||
container_name: ${GNS_EXECUTOR_CONTAINER_NAME}
|
||||
hostname: gitlink_executor
|
||||
image: gitlink/gns-executor:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/executor.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_EXECUTOR_LOCAL_PORT}:8083
|
||||
restart: always
|
||||
|
||||
networks:
|
||||
gitlink_network:
|
||||
driver: bridge
|
||||
name: gitlink_network
|
||||
driver_opts:
|
||||
com.docker.network.enable_ipv6: "true"
|
||||
|
||||
|
||||
|
||||
|
||||
version: '3'
|
||||
services:
|
||||
|
||||
mysql:
|
||||
image: mysql:${MYSQL_VERSION}
|
||||
container_name: ${MYSQL_CONTAINER_NAME}
|
||||
hostname: mysql
|
||||
environment:
|
||||
- MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASSWORD}
|
||||
- MYSQL_USER=${MYSQL_USER}
|
||||
- MYSQL_PASSWORD=${MYSQL_PASSWORD}
|
||||
- MYSQL_DATABASE=${MYSQL_DATABASE}
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/mysql:/var/lib/mysql
|
||||
- ${SQL_SCRIPT_PATH}/gns-notification.sql:/docker-entrypoint-initdb.d/0001.sql
|
||||
- ${SQL_SCRIPT_PATH}/hehui-gns-notification.sql:/docker-entrypoint-initdb.d/0002.sql
|
||||
- ${SQL_SCRIPT_PATH}/gns-email.sql:/docker-entrypoint-initdb.d/0003.sql
|
||||
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
|
||||
ports:
|
||||
- ${MYSQL_LOCAL_PORT}:3306
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
redis:
|
||||
image: redis:${REDIS_VERSION}
|
||||
container_name: ${REDIS_CONTAINER_NAME}
|
||||
hostname: redis
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/redis/data:/data
|
||||
- ${DOCKER_DATA_PATH}/redis/logs:/logs
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
ports:
|
||||
- ${REDIS_LOCAL_PORT}:6379
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-zookeeper
|
||||
zookeeper:
|
||||
image: confluentinc/cp-zookeeper:latest
|
||||
container_name: ${ZOOKEEPER_CONTAINER_NAME}
|
||||
hostname: zookeeper
|
||||
environment:
|
||||
ZOOKEEPER_CLIENT_PORT: 2181
|
||||
ZOOKEEPER_TICK_TIME: 2000
|
||||
ports:
|
||||
- ${ZOOKEEPER_LOCAL_PORT}:2181
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/zookeeper:/var/lib/zookeeper
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
container_name: ${KAFKA_CONTAINER_01_NAME}
|
||||
hostname: kafka1
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_01_LOCAL_PORT}:29092
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/lib:/var/lib/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/logs:/var/logs/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_01_NAME}/conf:/etc/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:${KAFKA_01_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
# See Also: https://hub.docker.com/r/confluentinc/cp-kafka
|
||||
kafka2:
|
||||
image: confluentinc/cp-kafka:latest
|
||||
container_name: ${KAFKA_CONTAINER_02_NAME}
|
||||
hostname: kafka2
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- ${KAFKA_02_LOCAL_PORT}:39092
|
||||
# volumes:
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/lib:/var/lib/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/logs:/var/logs/kafka
|
||||
# - ${DOCKER_DATA_PATH}/kafka/${KAFKA_CONTAINER_02_NAME}/conf:/etc/kafka
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 2
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,PLAINTEXT_HOST://localhost:${KAFKA_02_LOCAL_PORT}
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
networks:
|
||||
- gitlink_network
|
||||
|
||||
gitlink-reader:
|
||||
container_name: ${GNS_READER_CONTAINER_NAME}
|
||||
hostname: gitlink_reader
|
||||
image: gitlink/gns-reader:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/reader.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_READER_LOCAL_PORT}:8081
|
||||
|
||||
gitlink-writer:
|
||||
container_name: ${GNS_WRITER_CONTAINER_NAME}
|
||||
hostname: gitlink_writer
|
||||
image: gitlink/gns-writer:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/writer.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_WRITER_LOCAL_PORT}:8082
|
||||
|
||||
gitlink-executor:
|
||||
container_name: ${GNS_EXECUTOR_CONTAINER_NAME}
|
||||
hostname: gitlink_executor
|
||||
image: gitlink/gns-executor:${GITLINK_NOTIFICATION_SYS_VERSION}
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: middleware/executor.Dockerfile
|
||||
networks:
|
||||
- gitlink_network
|
||||
volumes:
|
||||
- ${DOCKER_DATA_PATH}/gitlink/:/data/logs/
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka2
|
||||
- redis
|
||||
- mysql
|
||||
ports:
|
||||
- ${GNS_EXECUTOR_LOCAL_PORT}:8083
|
||||
|
||||
networks:
|
||||
gitlink_network:
|
||||
driver: bridge
|
||||
name: gitlink_network
|
||||
driver_opts:
|
||||
com.docker.network.enable_ipv6: "true"
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ public class NewEmailJobVo {
|
|||
//平台编码
|
||||
private String platform;
|
||||
|
||||
private Integer id;
|
||||
|
||||
@ApiModelProperty(value = "邮件发送者", required = true)
|
||||
@NotNull(message = "邮件发送者不能为空")
|
||||
@Range(min = Integer.MIN_VALUE, max = Integer.MAX_VALUE)
|
||||
|
@ -40,6 +42,14 @@ public class NewEmailJobVo {
|
|||
this.platform = platform;
|
||||
}
|
||||
|
||||
public Integer getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(Integer id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public Integer getSender() {
|
||||
return sender;
|
||||
}
|
||||
|
|
|
@ -19,5 +19,5 @@ public interface EmailJobsMapper extends BaseMapper<EmailJob> {
|
|||
|
||||
int updateByPrimaryKey(@Param("platform") String platform, @Param("record") EmailJob record);
|
||||
|
||||
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("size") Integer size);
|
||||
List<EmailJob> getEmailJobsByDispatchedStatus(@Param("platform") String platform, @Param("dispatchedStatus") Integer dispatchedStatus, @Param("id") Integer emailJobId);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ public interface EmailSendRecordsMapper extends BaseMapper<EmailSendRecord> {
|
|||
//批量插入邮件发送任务记录
|
||||
int insertEmailSendRecordBatch(@Param("platform") String platform,@Param("list") List<EmailSendRecord> emailSendRecordList);
|
||||
|
||||
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("size") Integer sentNumber);
|
||||
List<EmailSendRecord> getRecordsByStatus(@Param("platform") String platform, @Param("status") Integer status, @Param("jobId") Integer emailJobId);
|
||||
|
||||
//批量更新邮件发送任务记录
|
||||
int updateEmailSendRecordsBatch(@Param("platform") String platform, @Param("list") List<EmailSendRecord> emailSendRecordList);
|
||||
|
|
|
@ -39,7 +39,6 @@ public interface SysNotificationMapper extends BaseMapper<SysNotification> {
|
|||
|
||||
List<SysNotification> getSysNotificationPageList(Page page, String orderBy,
|
||||
@Param("type") int type,
|
||||
@Param("sources") String sources,
|
||||
@Param("platform") String platform,
|
||||
@Param("receiver") Integer receiver,
|
||||
@Param("status") Integer status);
|
||||
|
|
|
@ -13,23 +13,23 @@ public interface EmailJobsService extends IService<EmailJob> {
|
|||
* 添加发送邮件任务
|
||||
*
|
||||
* @param newEmailJobVo
|
||||
* @return: boolean
|
||||
* @return: int
|
||||
* @Author: wanjia
|
||||
* @Date: 2021/9/13
|
||||
*/
|
||||
boolean createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
|
||||
int createEmailJob(NewEmailJobVo newEmailJobVo) throws Exception;
|
||||
|
||||
/**
|
||||
* 获取所有未处理邮件任务列表
|
||||
*
|
||||
* @param platform 平台编码
|
||||
* @param dispatchedStatus 发送状态:-1 未处理,1 处理成功,2 处理失败
|
||||
* @param size 列表大小
|
||||
* @param emailJobId 邮件任务id
|
||||
* @return: List<EmailJob>
|
||||
* @Author: wanjia
|
||||
* @Date: 2021/9/13
|
||||
*/
|
||||
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) throws Exception;
|
||||
List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) throws Exception;
|
||||
|
||||
/**
|
||||
* 更新邮件任务状态
|
||||
|
|
|
@ -23,12 +23,12 @@ public interface EmailSendRecordsService extends IService<EmailSendRecord> {
|
|||
*
|
||||
* @param platform 平台编码
|
||||
* @param status 邮件发送记录状态 -1未发送 1发送成功 2发送失败
|
||||
* @param size 列表数量
|
||||
* @param emailJobId 邮件任务id
|
||||
* @return: List<EmailSendRecord>
|
||||
* @Author: wanjia
|
||||
* @Date: 2021/9/15
|
||||
*/
|
||||
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer size) throws Exception;
|
||||
List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId) throws Exception;
|
||||
|
||||
/**
|
||||
* 变更邮件发送记录状态
|
||||
|
|
|
@ -36,17 +36,16 @@ public interface SysNotificationService extends IService<SysNotification> {
|
|||
/**
|
||||
* 获取消息列表
|
||||
*
|
||||
* @param type 类型 -1 全部 1 系统消息、2 @我
|
||||
* @param page 页码
|
||||
* @param size 页大小
|
||||
* @param platform 平台编号
|
||||
* @param receiver 消息接收者
|
||||
* @param status 状态 -1 全部、1 未读 2 已读
|
||||
* @param type 类型 -1 全部 1 系统消息、2 @我
|
||||
* @param sources
|
||||
* @param page 页码
|
||||
* @param size 页大小
|
||||
* @return
|
||||
*/
|
||||
|
||||
Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, String sources, Integer page, Integer size) throws Exception;
|
||||
Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, Integer page, Integer size) throws Exception;
|
||||
|
||||
/**
|
||||
* @Description: 批量删除系统消息
|
||||
|
|
|
@ -15,16 +15,17 @@ import java.util.List;
|
|||
public class EmailJobsServiceImpl extends ServiceImpl<EmailJobsMapper, EmailJob> implements EmailJobsService {
|
||||
|
||||
@Override
|
||||
public boolean createEmailJob(NewEmailJobVo newEmailJobVo) {
|
||||
public int createEmailJob(NewEmailJobVo newEmailJobVo) {
|
||||
String platform = newEmailJobVo.getPlatform();
|
||||
EmailJob emailJob = new EmailJob();
|
||||
BeanUtils.copyProperties(newEmailJobVo, emailJob);
|
||||
return baseMapper.insertSelective(platform, emailJob) > 0;
|
||||
baseMapper.insertSelective(platform, emailJob);
|
||||
return newEmailJobVo.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer size) {
|
||||
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, size);
|
||||
public List<EmailJob> getEmailJobsByDispatchedStatus(String platform,Integer dispatchedStatus, Integer emailJobId) {
|
||||
return baseMapper.getEmailJobsByDispatchedStatus(platform,dispatchedStatus, emailJobId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,7 +35,7 @@ public class EmailSendRecordsServiceImpl extends ServiceImpl<EmailSendRecordsMap
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer sentNumber){
|
||||
return baseMapper.getRecordsByStatus(platform, status, sentNumber);
|
||||
public List<EmailSendRecord> getRecordsByStatus(String platform, Integer status, Integer emailJobId){
|
||||
return baseMapper.getRecordsByStatus(platform, status, emailJobId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
|
||||
@Override
|
||||
@Transactional
|
||||
public boolean sendNotification(NewSysNotificationVo newSysNotificationVo) {
|
||||
public boolean sendNotification(NewSysNotificationVo newSysNotificationVo) throws Exception {
|
||||
List<SysNotification> sysNotificationList = new ArrayList<>();
|
||||
List<String> list = Arrays.asList(newSysNotificationVo.getReceivers().split(","));
|
||||
for (String receiver : list) {
|
||||
|
@ -51,7 +51,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
}
|
||||
|
||||
@Override
|
||||
public int markNotificationAs(String platform, Integer receiver, String notificationIds, Integer status, Integer type) {
|
||||
public int markNotificationAs(String platform, Integer receiver, String notificationIds, Integer status, Integer type) throws Exception {
|
||||
int count = baseMapper.updateStatusByNotificationId(platform, receiver, notificationIds, status, type);
|
||||
if (count > 0) {
|
||||
this.delUserCache(platform, receiver);
|
||||
|
@ -60,7 +60,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getNotificationCount(String platform, Integer receiver, Integer type, Integer status) {
|
||||
public int getNotificationCount(String platform, Integer receiver, Integer type, Integer status) throws Exception {
|
||||
String cacheKey = cacheKeyForCount(platform, receiver, type, status);
|
||||
Object foundResult = this.redisUtil.get(cacheKey);
|
||||
if (foundResult != null) {
|
||||
|
@ -73,8 +73,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
|
||||
|
||||
@Override
|
||||
public Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, String sources, Integer page, Integer size) {
|
||||
String cacheKey = cacheKeyForPage(platform, receiver, type, sources, status, page, size);
|
||||
public Page<SysNotification> getNotification(String platform, Integer receiver, Integer status, Integer type, Integer page, Integer size) throws Exception {
|
||||
String cacheKey = cacheKeyForPage(platform, receiver, type, status, page, size);
|
||||
Object foundResult = this.redisUtil.get(cacheKey);
|
||||
if (foundResult != null) {
|
||||
return (Page<SysNotification>) foundResult;
|
||||
|
@ -82,7 +82,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
|
||||
Page<SysNotification> pageItem = new Page<SysNotification>(page, size);
|
||||
List<SysNotification> sysNotificationList = baseMapper.getSysNotificationPageList(
|
||||
pageItem, "", type, sources, platform, receiver, status
|
||||
pageItem, "", type, platform, receiver, status
|
||||
);
|
||||
pageItem.setRecords(sysNotificationList);
|
||||
this.redisUtil.set(cacheKey, pageItem);
|
||||
|
@ -90,7 +90,7 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
}
|
||||
|
||||
@Override
|
||||
public int deleteNotifications(String platform, Integer receiver, String notificationIds, Integer type) {
|
||||
public int deleteNotifications(String platform, Integer receiver, String notificationIds, Integer type) throws Exception {
|
||||
int count = baseMapper.deleteNotificationByIds(platform, receiver, notificationIds, type);
|
||||
if (count > 0) {
|
||||
this.delUserCache(platform, receiver);
|
||||
|
@ -107,8 +107,8 @@ public class SysNotificationServiceImpl extends ServiceImpl<SysNotificationMappe
|
|||
return String.format("%s#T%s#S%s#Count", cachePrefixForPlatform(platform, receiver), type, status);
|
||||
}
|
||||
|
||||
private static String cacheKeyForPage(String platform, Integer receiver, Integer type, String sources, Integer status, Integer page, Integer size) {
|
||||
return String.format("%s#T%s#%s#S%s#P%s_S%s", cachePrefixForPlatform(platform, receiver), type, sources, status, page, size);
|
||||
private static String cacheKeyForPage(String platform, Integer receiver, Integer type, Integer status, Integer page, Integer size) {
|
||||
return String.format("%s#T%s#S%s#P%s_S%s", cachePrefixForPlatform(platform, receiver), type, status, page, size);
|
||||
}
|
||||
|
||||
private static String cachePrefixForPlatform(String platform, Integer receiver) {
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
#{record.subject,jdbcType=VARCHAR}, #{record.content,jdbcType=VARCHAR}, #{record.createdAt,jdbcType=TIMESTAMP},
|
||||
#{record.dispatchedAt,jdbcType=TIMESTAMP}, #{record.dispatchedStatus,jdbcType=INTEGER})
|
||||
</insert>
|
||||
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" >
|
||||
<insert id="insertSelective" parameterType="cn.org.gitlink.notification.model.dao.entity.EmailJob" useGeneratedKeys="true" keyProperty="id" >
|
||||
insert into ${platform}_email_jobs
|
||||
<trim prefix="(" suffix=")" suffixOverrides="," >
|
||||
<if test="record.id != null" >
|
||||
|
@ -127,9 +127,6 @@
|
|||
</update>
|
||||
<select id="getEmailJobsByDispatchedStatus" resultMap="BaseResultMap">
|
||||
select * from ${platform}_email_jobs
|
||||
where dispatched_status = #{dispatchedStatus} order by id desc
|
||||
<if test="size != null">
|
||||
limit #{size}
|
||||
</if>
|
||||
where dispatched_status = #{dispatchedStatus} and id = #{id}
|
||||
</select>
|
||||
</mapper>
|
|
@ -121,10 +121,7 @@
|
|||
select a.id, a.email, a.job_id, a.created_at, a.sent_at, a.status, b.subject, b.content
|
||||
from ${platform}_email_send_records a
|
||||
left join ${platform}_email_jobs b on a.job_id = b.id
|
||||
where status = #{status} order by id desc
|
||||
<if test="size != null">
|
||||
limit #{size}
|
||||
</if>
|
||||
where status = #{status} and job_id = #{jobId}
|
||||
</select>
|
||||
<update id="updateEmailSendRecordsBatch" parameterType="list">
|
||||
update ${platform}_email_send_records
|
||||
|
|
|
@ -202,12 +202,6 @@
|
|||
<if test="status != -1">
|
||||
and status = #{status}
|
||||
</if>
|
||||
<if test="sources != null and sources != ''">
|
||||
and source in
|
||||
<foreach collection="sources.split(',')" item="sources" open="(" separator="," close=")">
|
||||
#{sources}
|
||||
</foreach>
|
||||
</if>
|
||||
ORDER BY id DESC
|
||||
</select>
|
||||
<update id="deleteNotificationByIds">
|
||||
|
|
|
@ -28,7 +28,7 @@ public class ServiceTests {
|
|||
@Test
|
||||
public void testSysNotificationService() throws Exception {
|
||||
int i = sysNotificationService.getNotificationCount("gitlink", 234,1,1);
|
||||
Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1, "IssueChanged", 1,20);
|
||||
Page<SysNotification> sysNotificationPage = sysNotificationService.getNotification("gitlink", 100,1,1,1,20);
|
||||
NewSysNotificationVo newSysNotificationVo = new NewSysNotificationVo();
|
||||
newSysNotificationVo.setSender(1);
|
||||
newSysNotificationVo.setReceivers("7,8");
|
||||
|
|
12
pom.xml
12
pom.xml
|
@ -79,6 +79,18 @@
|
|||
<artifactId>spring-boot-starter-mail</artifactId>
|
||||
<version>${springboot.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>2.15.0</version>
|
||||
</dependency>
|
||||
<!--https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core-->
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.15.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -52,9 +52,6 @@ public class NotificationController {
|
|||
@ApiParam(value = "消息类型:值为-1时,获取全部信息;值为1时,获取系统消息;值为2时,获取@我消息", defaultValue = "-1")
|
||||
@RequestParam(name = "type", required = false, defaultValue = "-1") Integer type,
|
||||
|
||||
@ApiParam(value = "消息来源")
|
||||
@RequestParam(name = "sources", required = false) String sources,
|
||||
|
||||
@ApiParam(value = "页码:值为-1时(默认值),不开启分页;", required = false, defaultValue = "-1")
|
||||
@RequestParam(name = "page", required = false, defaultValue = "-1") Integer page,
|
||||
|
||||
|
@ -91,7 +88,7 @@ public class NotificationController {
|
|||
}
|
||||
|
||||
//分页的数据
|
||||
Page foundPage = notificationService.getNotification(platform, receiver, status, type, sources, page, size);
|
||||
Page foundPage = notificationService.getNotification(platform, receiver, status, type, page, size);
|
||||
if (foundPage != null) {
|
||||
notificationListVo.setPageNum(foundPage.getCurrent());
|
||||
notificationListVo.setPageSize(foundPage.getSize());
|
||||
|
|
Loading…
Reference in New Issue