MONGO是一个NOSQL型的文档数据库,最近拿ES做业务,结果跪了,光一个一秒的刷新延时就实在让人头大了,所以需要一个中间的数据库做写业务,然后搜索的业务单独拿ES做。最后选型上选了MONGO作为中间的业务数据库,加之3.6版本后的MONGO自带了用于订阅数据操作的功能——change stream。
概述
MONGO的change stream和阿里的数据同步工具canal基于的原理是一样的,都是将自己作为slave,然后发送订阅请求,写节点主动推送数据给订阅方。在3.6版本之前,MONGO都是通过读OPLOG解析数据操作,然后同步数据,现在有官方提供的原生支持,自然是用change stream是最好的。
前置条件
既然是要将自己做为slave来接受数据推送,那MONGO本身自然是要复制集,使用该订阅功能的前置条件有两个,如下:
- 数据库支持复制集及分片集群;
- 存储引擎类型是WiredTiger;
执行命令db.serverStatus
在storageEngine一节可以看到当前的存储引擎的类型,
在repl一节可以看到复制集的基本信息,假如没有看到这一节的信息,证明当前不在复制集内。
{
"host" : "7769b3b73c8t",
"version" : "3.6.13",
"process" : "mongod",
"pid" : 1,
"storageEngine" : {
"name" : "wiredTiger",
"supportsCommittedReads" : true,
"readOnly" : false,
"persistent" : true
},
"repl" : {
"hosts" : [
"mongo-node1:27017",
"mongo-node2:27017"
],
"setName" : "rs0",
"setVersion" : 1,
"ismaster" : true,
"secondary" : false,
"primary" : "mongo-node1:27017",
"me" : "mongo-node1:27017",
"electionId" : ObjectId("7fffffff0000000000000002"),
"lastWrite" : {
"opTime" : {
"ts" : Timestamp(1563528545, 1),
"t" : 2
},
"lastWriteDate" : ISODate("2019-07-19T17:29:05.000+08:00"),
"majorityOpTime" : {
"ts" : Timestamp(1563528545, 1),
"t" : 2
},
"majorityWriteDate" : ISODate("2019-07-19T17:29:05.000+08:00")
},
"rbid" : 1
},
"uptime" : 9388,
"uptimeMillis" : 9388110,
"uptimeEstimate" : 9388,
"localTime" : ISODate("2019-07-19T17:29:09.451+08:00"),
}
复制集方式启动MONGO
以docker方式做示例,其他部署方式类推。
创建网络
创建一个主节点库和从节点库共同连接的docker网络。
docker network create -d bridge mongo-cluster
创建主节点
创建能读写的主节点,并指定复制集。
docker run -d \
--name mongo-node1 \
-p 27017:27017 \
--network mongo-cluster
mongo:3.6.13
--replSet "rs0"
登录mongo shell。
docker exec -it mongo-node1 mongo
初始化复制集。
rs.initiate({
_id : "rs0",
members : [
{
_id : 0,
host : "mongo-node1:27017"
}
]
})
创建数据库及文档以做测试之用。
use test
db.createCollection('user')
db.user.insertOne({
"name" : "jam"
})
以下是在NOSQLBOOSTER的shell中执行的测试demo。打开两个shell窗口,切换到test数据库,先在一个shell中开启一个监听user文档插入事件的方法,再在另一个shell插入数据。
const col = db.getCollection('user')
const cs = col.watch([{
$match: {
operationType: "insert"
}
}])
cs.on('change', (it) => {
console.log(tojson(it))
})
sleep(10000)
db.user.insertOne({
"name" : "jam"
})
测试demo执行成功后,可以在控制台看到如下的输出结果。
spring boot change stream demo
创建maven工程,加入依赖,spring data mongo(皮实) 及 lombok (耐操)。
依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
主要步骤及代码
change stream 监听任务的创建过程分为四步:
- 创建 MessageListenerContainer 类用于注册监听任务;
- 创建处理的回调类MessageListener,对指定类型的数据操作做处理;
- 创建changestream选项参数类ChangeStreamOptions;
- 启动监听容器并注册监听任务,需要指定collection name,监听器及changestream选项参数。
- 在应用关闭后停止容器并注销监听任务;
一些重要步骤的代码:
创建监听器容器并初始化(启动)。
public ChangeStreamListener(MongoTemplate template) {
this.listenerContainer = new DefaultMessageListenerContainer(template);
this.databaseName = template.getDb().getName();
this.init();
}
private void init() {
listenerContainer.start();
}
回调方法中针对操作类型执行处理方法,回调方法此处我抽了一个接口出来。
private <T> MessageListener<ChangeStreamDocument<Document>, T> createMessageListener(CallbackChangeStream<T> callback) {
return listenMsg -> {
ChangeStreamDocument<Document> raw = listenMsg.getRaw();
OperationType operationType = Objects.requireNonNull(raw).getOperationType();
switch (operationType) {
case INSERT:
callback.insert(raw, listenMsg.getBody());
break;
case REPLACE:
case UPDATE:
callback.update(raw, listenMsg.getBody());
break;
case DELETE:
callback.delete(raw, listenMsg.getBody());
break;
default:
break;
}
};
}
注册监听任务,其中subscription.await(Duration.ofSeconds(10))
设置了10秒的时间。
private <T> void registerListener(String collectionName, MessageListener<ChangeStreamDocument<Document>, T> listener, Class<T> bodyType, ChangeStreamOptions changeStreamOptions) {
ChangeStreamRequest.ChangeStreamRequestOptions options = new ChangeStreamRequest.ChangeStreamRequestOptions(this.databaseName, collectionName, changeStreamOptions);
Subscription subscription = listenerContainer.register(new ChangeStreamRequest<>(listener, options), bodyType);
try {
if (subscription.await(Duration.ofSeconds(10))) {
log.info("collection:{} listener register success! ", collectionName);
} else {
log.info("collection:{} listener register fail or time out! ", collectionName);
}
} catch (InterruptedException e) {
log.error("interrupted! ", e);
Thread.currentThread().interrupt();
}
}
在抽出的接口中实现了各种操作的回调方法,并返回构造好的监听器,User类是指定名称的实体映射类。
private MessageListener<ChangeStreamDocument<Document>, User> userListener() {
return createMessageListener(new CallbackChangeStream<User>() {
@Override
public void insert(ChangeStreamDocument<Document> raw, User body) {
log.info("new user is inserted, content is {}.", body.toString());
}
@Override
public void update(ChangeStreamDocument<Document> raw, User body) {
log.info("user is updated, content is {}.", body);
}
@Override
public void delete(ChangeStreamDocument<Document> raw, User body) {
log.info("user of {} is deleted!", raw.getDocumentKey().getObjectId("_id").getValue().toHexString());
}
});
}
在应用退出前做一些资源释放和处理。
@PreDestroy
private void destroy() {
if (listenerContainer != null && listenerContainer.isRunning()) {
this.listenerContainer.stop();
}
}
启动应用,在分别执行插入,更新和删除操作后的控制台输出如下:
demo项目的地址放在github上,地址在这里。
总结
change stream用起来还是比3.6直接读oplog去同步数据要舒服一些的,但4.0版本之后的mongo才提供了事务支持,所以用着还是有点别扭😑。到现在为止对于mongo最大的使用感受就是能搞一起就搞到一起,数据冗余不可怕,文档过于分散一旦有跨文档查询的需求时就直接当场去世😔。