Mongodb的oplog详解
Oplog是MongoDB实现复制集的关键数据结构,在复制集中Primary对数据库操作之后就会产生一个Oplog文档保存在local.oplog.rs集合中,Secondary成员会拉取Primary的Oplog并重放相同的操作,从而达到Secondary成员与Primary有一致的数据。实际上复制集中每一个成员都会保存Oplog,其他成员会根据连接延迟等因数选择最近的成员拉取Oplog数据。
Oplog存在集合local.oplog.rs,这是系统内置集合,一个cappedcollection,即是这个collection有固定大小,一旦写满数据会从头开始写入,就像一个圆形的队列结构。这个collection大小在初始化集群时设置,默认的大小是5%的空闲磁盘空间,也可以在配置文件设置oplogSizeMB选项,或者在启动MongoDB后使用replSetResizeOplog命令动态设置collection大小。
Oplog与MongoDB的其他的文档没有什么不同,它固定有一些属性:
- ts:MongoDB的内置的特殊时间戳数据结构,如Timestamp(1503110518,1),由秒级的Unix时间戳和一个顺序增长的整数increment表示。长度为64位,其中Unix时间戳占32位,后32位可以保存同一秒内的第几次操作。
- h:hash值代表每个Oplog的唯一标识。
- v:Oplog版本
- ns:namespace命名空间,数据库+集合,用database.collection表示。但如果是表操作命令等,变成database.$cmd。
- op:operationtype,操作类型,包含以下几种:
- i:insert,插入文档
- u:update,更新文档
- d:delete,删除文档
- c:command,操作命令,如createIndex等
- n:空操作,用于空闲时主从同步Oplog时间信息
- o:operation,Oplog操作的具体内容,例如ioperationtype,o即是插入的文档。对于uoperationtype,只更新部分内容,o键的内容为{$set:{...}}
- o2:用于update操作,包含_id属性值。
Oplog的重放是幂等(idempotent)的,即是说同一个Oplog重放多次最终结果还是一致的。这是MongoDB将许多命令操作进行了转化,保持生成的Oplog是可以幂等的,如执行以下$inc操作:
db.test.update({_id:ObjectId("533022d70d7e2c31d4490d22")},{$inc:{count:1}})
产生的Oplog为:
{ "ts":Timestamp(1503110518,1), "t":NumberLong(8), "h":NumberLong(-3967772133090765679), "v":NumberInt(2), "op":"u", "ns":"mongo.test", "o2":{ "_id":ObjectId("533022d70d7e2c31d4490d22") }, "o":{ "$set":{ "count":2.0 } } }
以上MongoDB可以保证Oplog的数据操作(DML语句)是幂等的,但数据表操作(DDL语句)命令无法保证,例如重复执行相同的createIndex命令。
Oplog的查询
Cappedcollection内文档是以插入顺序排序的,没有其他索引,但是local.oplog.rs是一个特殊的cappedcollection,在Wiredtiger引擎的话,Oplog的时间戳会作为一个特殊的元信息存储,使得Oplog可以以ts字段排序,查询Oplog时可以利用ts字段筛选。
一般来说Secondary同步需要经过initialsync和incrementalsync,initialsync同步完成后,需拉取从同步时间点开始之后的Oplog进行持续重放。所以查询Oplog的操作一般是:
db.oplog.rs.find({$gte:{'ts':Timestamp(1503110518,1)}})
Secondary需要不断获取Primary产生的Oplog,复制集会使用tailablecursor持续获取Oplog数据,非常类似Unix系统的tail-f。这会提高效率,因为一般的cursor使用完毕后就会关闭,而tailablecursor会保存上次的id,并持续获取数据。
如果使用pymongo驱动器,则定位从某个时间点之后的Oplog可以这麽写:
coll=db['local'].get_collection( 'oplog.rs', codec_options=bson.codec_options.CodecOptions(document_class=bson.son.SON)) cursor=coll.find({'ts':{'$gte':start_optime}}, cursor_type=pymongo.cursor.CursorType.TAILABLE, oplog_replay=True, no_cursor_timeout=True) whileTrue: try: oplog=cursor.next() process(oplog) exceptStopException: #没有更多的Oplog数据 time.sleep(1)
cursor_type使用TAILABLE或者TAILABLE_AWAIT,使用后一种类型时,如果没有更多的Oplog数据,则这次请求会阻塞等待有Oplog数据或者到达等待的时间超时返回。
设置oplog_replay标记可以表示此次请求的类型是保存Oplog的cappedcollection,提供ts筛选参数,进行查询优化。
获取到Oplog之后,就可以做数据同步或者分发到感兴趣的消费者作特殊分析,如MongoShake工具。
参考了文档:
ReplicaSetOplog:https://docs.mongodb.com/manual/core/replica-set-oplog/
MongoDBoplog漫谈:http://caosiyang.github.io/2016/12/24/mongodb-oplog/
MongoDB复制集原理:https://www.nhooo.com/article/166148.htm
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。