MONGO是一个NOSQL型的文档数据库,最近拿ES做业务,结果跪了,光一个一秒的刷新延时就实在让人头大了,所以需要一个中间的数据库做写业务,然后搜索的业务单独拿ES做。最后选型上选了MONGO作为中间的业务数据库,加之3.6版本后的MONGO自带了用于订阅数据操作的功能——change stream。

概述

MONGO的change stream和阿里的数据同步工具canal基于的原理是一样的,都是将自己作为slave,然后发送订阅请求,写节点主动推送数据给订阅方。在3.6版本之前,MONGO都是通过读OPLOG解析数据操作,然后同步数据,现在有官方提供的原生支持,自然是用change stream是最好的。

前置条件

既然是要将自己做为slave来接受数据推送,那MONGO本身自然是要复制集,使用该订阅功能的前置条件有两个,如下:

  1. 数据库支持复制集及分片集群;
  2. 存储引擎类型是WiredTiger;

执行命令db.serverStatusstorageEngine一节可以看到当前的存储引擎的类型, 在repl一节可以看到复制集的基本信息,假如没有看到这一节的信息,证明当前不在复制集内。

{
    "host" : "7769b3b73c8t",
    "version" : "3.6.13",
    "process" : "mongod",
    "pid" : 1,
    "storageEngine" : {
        "name" : "wiredTiger",
        "supportsCommittedReads" : true,
        "readOnly" : false,
        "persistent" : true
    },
    "repl" : {
        "hosts" : [
            "mongo-node1:27017",
            "mongo-node2:27017"
        ],
        "setName" : "rs0",
        "setVersion" : 1,
        "ismaster" : true,
        "secondary" : false,
        "primary" : "mongo-node1:27017",
        "me" : "mongo-node1:27017",
        "electionId" : ObjectId("7fffffff0000000000000002"),
        "lastWrite" : {
            "opTime" : {
                "ts" : Timestamp(1563528545, 1),
                "t" : 2
            },
            "lastWriteDate" : ISODate("2019-07-19T17:29:05.000+08:00"),
            "majorityOpTime" : {
                "ts" : Timestamp(1563528545, 1),
                "t" : 2
            },
            "majorityWriteDate" : ISODate("2019-07-19T17:29:05.000+08:00")
        },
        "rbid" : 1
    },
    "uptime" : 9388,
    "uptimeMillis" : 9388110,
    "uptimeEstimate" : 9388,
    "localTime" : ISODate("2019-07-19T17:29:09.451+08:00"),
}

复制集方式启动MONGO

以docker方式做示例,其他部署方式类推。

创建网络

创建一个主节点库和从节点库共同连接的docker网络。

docker network create -d bridge mongo-cluster

创建主节点

创建能读写的主节点,并指定复制集。

docker run -d \
           --name mongo-node1 \
           -p 27017:27017 \
           --network mongo-cluster
           mongo:3.6.13
           --replSet "rs0"

登录mongo shell。

docker exec -it mongo-node1 mongo

初始化复制集。

rs.initiate({
    _id : "rs0",
    members : [
        {
            _id : 0,
            host : "mongo-node1:27017"
        }
    ]
})

创建数据库及文档以做测试之用。

use test
db.createCollection('user')
db.user.insertOne({
    "name" : "jam"
})

以下是在NOSQLBOOSTER的shell中执行的测试demo。打开两个shell窗口,切换到test数据库,先在一个shell中开启一个监听user文档插入事件的方法,再在另一个shell插入数据。

const col = db.getCollection('user')
const cs = col.watch([{
    $match: {
        operationType: "insert"
    }
}])

cs.on('change', (it) => {
    console.log(tojson(it))
})

sleep(10000)
db.user.insertOne({
    "name" : "jam"
})

测试demo执行成功后,可以在控制台看到如下的输出结果。

change stream 监听成功的结果

spring boot change stream demo

创建maven工程,加入依赖,spring data mongo(皮实) 及 lombok (耐操)。

依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

主要步骤及代码

change stream 监听任务的创建过程分为四步:

  1. 创建 MessageListenerContainer 类用于注册监听任务;
  2. 创建处理的回调类MessageListener,对指定类型的数据操作做处理;
  3. 创建changestream选项参数类ChangeStreamOptions;
  4. 启动监听容器并注册监听任务,需要指定collection name,监听器及changestream选项参数。
  5. 在应用关闭后停止容器并注销监听任务;

一些重要步骤的代码:

创建监听器容器并初始化(启动)。

public ChangeStreamListener(MongoTemplate template) {
    this.listenerContainer = new DefaultMessageListenerContainer(template);
    this.databaseName = template.getDb().getName();
    this.init();
}

private void init() {
    listenerContainer.start();
}

回调方法中针对操作类型执行处理方法,回调方法此处我抽了一个接口出来。

private <T> MessageListener<ChangeStreamDocument<Document>, T> createMessageListener(CallbackChangeStream<T> callback) {
    return listenMsg -> {
        ChangeStreamDocument<Document> raw = listenMsg.getRaw();
        OperationType operationType = Objects.requireNonNull(raw).getOperationType();
        switch (operationType) {
            case INSERT:
                callback.insert(raw, listenMsg.getBody());
                break;
            case REPLACE:
            case UPDATE:
                callback.update(raw, listenMsg.getBody());
                break;
            case DELETE:
                callback.delete(raw, listenMsg.getBody());
                break;
            default:
                break;
        }
    };
}

注册监听任务,其中subscription.await(Duration.ofSeconds(10))设置了10秒的时间。

private <T> void registerListener(String collectionName, MessageListener<ChangeStreamDocument<Document>, T> listener, Class<T> bodyType, ChangeStreamOptions changeStreamOptions) {
    ChangeStreamRequest.ChangeStreamRequestOptions options = new ChangeStreamRequest.ChangeStreamRequestOptions(this.databaseName, collectionName, changeStreamOptions);
    Subscription subscription = listenerContainer.register(new ChangeStreamRequest<>(listener, options), bodyType);
    try {
        if (subscription.await(Duration.ofSeconds(10))) {
            log.info("collection:{} listener register success! ", collectionName);
        } else {
            log.info("collection:{} listener register fail or time out! ", collectionName);
        }
    } catch (InterruptedException e) {
        log.error("interrupted! ", e);
        Thread.currentThread().interrupt();
    }
}

在抽出的接口中实现了各种操作的回调方法,并返回构造好的监听器,User类是指定名称的实体映射类。

private MessageListener<ChangeStreamDocument<Document>, User> userListener() {
    return createMessageListener(new CallbackChangeStream<User>() {
        @Override
        public void insert(ChangeStreamDocument<Document> raw, User body) {
            log.info("new user is inserted, content is {}.", body.toString());
        }

        @Override
        public void update(ChangeStreamDocument<Document> raw, User body) {
            log.info("user is updated, content is {}.", body);
        }

        @Override
        public void delete(ChangeStreamDocument<Document> raw, User body) {
            log.info("user of {} is deleted!", raw.getDocumentKey().getObjectId("_id").getValue().toHexString());
        }
    });
}

在应用退出前做一些资源释放和处理。

@PreDestroy
private void destroy() {
    if (listenerContainer != null && listenerContainer.isRunning()) {
        this.listenerContainer.stop();
    }
}

启动应用,在分别执行插入,更新和删除操作后的控制台输出如下:

demo执行结果

demo项目的地址放在github上,地址在这里

总结

change stream用起来还是比3.6直接读oplog去同步数据要舒服一些的,但4.0版本之后的mongo才提供了事务支持,所以用着还是有点别扭😑。到现在为止对于mongo最大的使用感受就是能搞一起就搞到一起,数据冗余不可怕,文档过于分散一旦有跨文档查询的需求时就直接当场去世😔。