首页>新闻>新闻详情>

死磕以太坊源码分析之MPT树-下

2021-01-06 13:00:51

死磕以太坊源码分析之MPT树-下

文章以及资料请查看:https://github.com/blockchainGuide/

上篇主要介绍了以太坊中的MPT树的原理,这篇主要会对MPT树涉及的源码进行拆解分析。trie模块主要有以下几个文件:

|-encoding.go 主要讲编码之间的转换
|-hasher.go 实现了从某个结点开始计算子树的哈希的功能
|-node.go 定义了一个Trie树中所有结点的类型和解析的代码
|-sync.go 实现了SyncTrie对象的定义和所有方法
|-iterator.go 定义了所有枚举相关接口和实现
|-secure_trie.go 实现了SecureTrie对象
|-proof.go 为key构造一个merkle证明
|-trie.go Trie树的增删改查
|-database.go 对内存中的trie树节点进行引用计数
实现概览

encoding.go

这个主要是讲三种编码(KEYBYTES encodingHEX encodingCOMPACT encoding)的实现与转换,trie中全程都需要用到这些,该文件中主要实现了如下功能:

  1. hex编码转换为Compact编码:hexToCompact()
  2. Compact编码转换为hex编码:compactToHex()
  3. keybytes编码转换为Hex编码:keybytesToHex()
  4. hex编码转换为keybytes编码:hexToKeybytes()
  5. 获取两个字节数组的公共前缀的长度:prefixLen()
func hexToCompact(hex []byte) []byte {
    terminator := byte(0)
    if hasTerm(hex) { //检查是否有结尾为0x10 => 16
        terminator = 1 //有结束标记16说明是叶子节点
        hex = hex[:len(hex)-1] //去除尾部标记
    }
    buf := make([]byte, len(hex)/2+1) // 字节数组
    
    buf[0] = terminator << 5 // 标志byte为00000000或者00100000
    //如果长度为奇数,添加奇数位标志1,并把第一个nibble字节放入buf[0]的低四位
    if len(hex)&1 == 1 {
        buf[0] |= 1 << 4 // 奇数标志 00110000
        buf[0] |= hex[0] // 第一个nibble包含在第一个字节中 0011xxxx
        hex = hex[1:]
    }
    //将两个nibble字节合并成一个字节
    decodeNibbles(hex, buf[1:])
    return buf
  
//compact编码转化为Hex编码
func compactToHex(compact []byte) []byte {
    base := keybytesToHex(compact)
    base = base[:len(base)-1]
     // apply terminator flag
    // base[0]包括四种情况
    // 00000000 扩展节点偶数位
    // 00000001 扩展节点奇数位
    // 00000010 叶子节点偶数位
    // 00000011 叶子节点奇数位

    // apply terminator flag
    if base[0] >= 2 {
       //如果是叶子节点,末尾添加Hex标志位16
        base = append(base, 16)
    }
    // apply odd flag
    //如果是偶数位,chop等于2,否则等于1
    chop := 2 - base[0]&1
    return base[chop:]
}
//compact编码转化为Hex编码
func compactToHex(compact []byte) []byte {
    base := keybytesToHex(compact)
    base = base[:len(base)-1]
     // apply terminator flag
    // base[0]包括四种情况
    // 00000000 扩展节点偶数位
    // 00000001 扩展节点奇数位
    // 00000010 叶子节点偶数位
    // 00000011 叶子节点奇数位

    // apply terminator flag
    if base[0] >= 2 {
       //如果是叶子节点,末尾添加Hex标志位16
        base = append(base, 16)
    }
    // apply odd flag
    //如果是偶数位,chop等于2,否则等于1
    chop := 2 - base[0]&1
    return base[chop:]
}
// 将十六进制的bibbles转成key bytes,这只能用于偶数长度的key
func hexToKeybytes(hex []byte) []byte {
    if hasTerm(hex) {
        hex = hex[:len(hex)-1]
    }
    if len(hex)&1 != 0 {
        panic("can't convert hex key of odd length")
    }
    key := make([]byte, (len(hex)+1)/2)
    decodeNibbles(hex, key)
    return key
}


// 返回a和b的公共前缀的长度
func prefixLen(a, b []byte) int {
    var i, length = 0, len(a)
    if len(b) < length {
        length = len(b)
    }
    for ; i < length; i++ {
        if a[i] != b[i] {
            break
        }
    }
    return i
}


node.go

四种节点

node 接口分四种实现: fullNode,shortNode,valueNode,hashNode,其中只有 fullNode 和 shortNode 可以带有子节点。

type (
    fullNode struct {
        Children [17]node // 分支节点
        flags    nodeFlag
    }
    shortNode struct { //扩展节点
        Key   []byte
        Val   node //可能指向叶子节点,也可能指向分支节点。
        flags nodeFlag
    }
    hashNode  []byte
    valueNode []byte // 叶子节点值,但是该叶子节点最终还是会包装在shortNode中
)

trie.go

Trie对象实现了MPT树的所有功能,包括(key, value)对的增删改查、计算默克尔哈希,以及将整个树写入数据库中。

iterator.go

nodeIterator提供了遍历树内部所有结点的功能。其结构如下:此结构体是在trie.go定义的

type nodeIterator struct {
    trie.NodeIterator
    t   *odrTrie
    err error
}

里面包含了一个接口NodeIterator,它的实现则是由iterator.go来提供的,其方法如下:

func (it *nodeIterator) Next(descend bool) bool 
func (it *nodeIterator) Hash() common.Hash 
func (it *nodeIterator) Parent() common.Hash 
func (it *nodeIterator) Leaf() bool 
func (it *nodeIterator) LeafKey() []byte 
func (it *nodeIterator) LeafBlob() []byte 
func (it *nodeIterator) LeafProof() [][]byte 
func (it *nodeIterator) Path() []byte {}
func (it *nodeIterator) seek(prefix []byte) error 
func (it *nodeIterator) peek(descend bool) (*nodeIteratorState, *int, []byte, error) 
func (it *nodeIterator) nextChild(parent *nodeIteratorState, ancestor common.Hash) (*nodeIteratorState, []byte, bool) 
func (it *nodeIterator) push(state *nodeIteratorState, parentIndex *int, path []byte) 
func (it *nodeIterator) pop() 

NodeIterator的核心是Next方法,每调用一次这个方法,NodeIterator对象代表的当前节点就会更新至下一个节点,当所有结点遍历结束,Next方法返回false

生成NodeIterator结口的方法有以下3种:

①:Trie.NodeIterator(start []byte)

通过start参数指定从哪个路径开始遍历,如果为nil则从头到尾按顺序遍历。

②:NewDifferenceIterator(a, b NodeIterator)

当调用NewDifferenceIterator(a, b NodeIterator)后,生成的NodeIterator只遍历存在于 b 但不存在于 a 中的结点。

③:NewUnionIterator(iters []NodeIterator)

当调用NewUnionIterator(its []NodeIterator)后,生成的NodeIterator遍历的结点是所有传入的结点的合集。

database.go

Databasetrie模块对真正数据库的缓存层,其目的是对缓存的节点进行引用计数,从而实现区块的修剪功能。主要方法如下:

func NewDatabase(diskdb ethdb.KeyValueStore) *Database
func NewDatabaseWithCache(diskdb ethdb.KeyValueStore, cache int) *Database 
func (db *Database) DiskDB() ethdb.KeyValueReader
func (db *Database) InsertBlob(hash common.Hash, blob []byte)
func (db *Database) insert(hash common.Hash, blob []byte, node node)
func (db *Database) insertPreimage(hash common.Hash, preimage []byte)
func (db *Database) node(hash common.Hash) node
func (db *Database) Node(hash common.Hash) ([]byte, error)
func (db *Database) preimage(hash common.Hash) ([]byte, error)
func (db *Database) secureKey(key []byte) []byte
func (db *Database) Nodes() []common.Hash
func (db *Database) Reference(child common.Hash, parent common.Hash)
func (db *Database) Dereference(root common.Hash)
func (db *Database) dereference(child common.Hash, parent common.Hash)
func (db *Database) Cap(limit common.StorageSize) error
func (db *Database) Commit(node common.Hash, report bool) error

security_trie.go

可以理解为加密了的trie的实现,ecurity_trie包装了一下trie树, 所有的key都转换成keccak256算法计算的hash值。同时在数据库里面存储hash值对应的原始的key
但是官方在代码里也注释了,这个代码不稳定,除了测试用例,别的地方并没有使用该代码。

proof.go

  • Prove():根据给定的key,在trie中,将满足key中最大长度前缀的路径上的节点都加入到proofDb(队列中每个元素满足:未编码的hash以及对应rlp编码后的节点)
  • VerifyProof():验证proffDb中是否存在满足输入的hash,和对应key的节点,如果满足,则返回rlp解码后的该节点。
实现细节

Trie对象的增删改查

①:Trie树的初始化

如果root不为空,就通过resolveHash来加载整个Trie树,如果为空,就新建一个Trie树。

func New(root common.Hash, db *Database) (*Trie, error) {
    if db == nil {
        panic("trie.New called without a database")
    }
    trie := &Trie{
        db: db,
    }
    if root != (common.Hash{}) && root != emptyRoot {
        rootnode, err := trie.resolveHash(root[:], nil)
        if err != nil {
            return nil, err
        }
        trie.root = rootnode
    }
    return trie, nil
}

②:Trie树的插入

首先Trie树的插入是个递归调用的过程,它会从根开始找,一直找到合适的位置插入。

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error)

参数说明:

  • n: 当前要插入的节点
  • prefix: 当前已经处理完的key(节点共有的前缀)
  • key: 未处理完的部分key,完整的key = prefix + key
  • value:需要插入的值

返回值说明:

  • bool : 操作是否改变了Trie树(dirty)
  • Node :插入完成后的子树的根节点

接下来就是分别对shortNodefullNodehashNodenil 几种情况进行说明。

2.1:节点为nil

空树直接返回shortNode, 此时整颗树的根就含有了一个shortNode节点。

case nil:
        return true, &shortNode{key, value, t.newFlag()}, nil

2.2 :节点为shortNode

  • 首先计算公共前缀,如果公共前缀就等于key,那么说明这两个key是一样的,如果value也一样的(dirty == false),那么返回错误。

  • 如果没有错误就更新shortNode的值然后返回

  • 如果公共前缀不完全匹配,那么就需要把公共前缀提取出来形成一个独立的节点(扩展节点),扩展节点后面连接一个branch节点,branch节点后面看情况连接两个short节点。

  • 首先构建一个branch节点(branch := &fullNode{flags: t.newFlag()}),然后再branch节点的Children位置调用t.insert插入剩下的两个short节点

matchlen := prefixLen(key, n.Key)
        if matchlen == len(n.Key) {
            dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value)
            if !dirty || err != nil {
                return false, n, err
            }
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
        }
        branch := &fullNode{flags: t.newFlag()}
        var err error
        _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val)
        if err != nil {
            return false, nil, err
        }
        _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value)
        if err != nil {
            return false, nil, err
        }
        if matchlen == 0 {
            return true, branch, nil
    }
        return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil

2.3: 节点为fullNode

节点是fullNode(也就是分支节点),那么直接往对应的孩子节点调用insert方法,然后把对应的孩子节点指向新生成的节点。

dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
        if !dirty || err != nil {
            return false, n, err
        }
        n = n.copy()
        n.flags = t.newFlag()
        n.Children[key[0]] = nn
        return true, n, nil

2.4: 节点为hashnode

暂时还在数据库中的节点,先调用 t.resolveHash(n, prefix)来加载到内存,然后调用insert方法来插入。

rn, err := t.resolveHash(n, prefix)
        if err != nil {
            return false, nil, err
        }
        dirty, nn, err := t.insert(rn, prefix, key, value)
        if !dirty || err != nil {
            return false, rn, err
        }
        return true, nn, nil

③:Trie树查询值

其实就是根据输入的hash,找到对应的叶子节点的数据。主要看TryGet方法。

参数:

  • origNode:当前查找的起始node位置
  • key:输入要查找的数据的hash
  • pos:当前hash匹配到第几位
func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) {
    switch n := (origNode).(type) {
    case nil: //表示当前trie是空树
        return nil, nil, false, nil
    case valueNode: ////这就是我们要查找的叶子节点对应的数据
        return n, n, false, nil
    case *shortNode: ////在叶子节点或者扩展节点匹配
        if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) {
            return nil, n, false, nil
        }
        value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key))
        if err == nil && didResolve {
            n = n.copy()
            n.Val = newnode
        }
        return value, n, didResolve, err
    case *fullNode://在分支节点匹配
        value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1)
        if err == nil && didResolve {
            n = n.copy()
            n.Children[key[pos]] = newnode
        }
        return value, n, didResolve, err
    case hashNode: //说明当前节点是轻节点,需要从db中获取
        child, err := t.resolveHash(n, key[:pos])
        if err != nil {
            return nil, n, true, err
        }
        value, newnode, _, err := t.tryGet(child, key, pos)
        return value, newnode, true, err
...
}

didResolve用于判断trie树是否会发生变化,tryGet()只是用来获取数据的,当hashNodedb中获取该node值后需要更新现有的trie,didResolve就会发生变化。其他就是基本的递归查找树操作。

④:Trie树更新值

更新值,其实就是调用insert方法进行操作。

到此Trie树的增删改查就讲解的差不多了。

将节点写入到Trie的内存数据库

如果要把节点写入到内存数据库,需要序列化,可以先去了解下以太坊的Rlp编码。这部分工作由trie.Commit()完成,当trie.Commit(nil),会执行序列化和缓存等操作,序列化之后是使用的Compact Encoding进行编码,从而达到节省空间的目的。

func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
    if t.db == nil {
        panic("commit called on trie with nil database")
    }
    hash, cached, err := t.hashRoot(t.db, onleaf)
    if err != nil {
        return common.Hash{}, err
    }
    t.root = cached
    return common.BytesToHash(hash.(hashNode)), nil
}

上述代码大概讲了这些:

  • 每次执行Commit(),该trie的cachegen就会加 1
  • Commit()方法返回的是trie.root所指向的nodehash(未编码)
  • 其中的hashRoot()方法目的是返回trie.root所指向的node的hash以及每个节点都带有各自hash的trie树的root
//为每个node生成一个hash
func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) {
    if t.root == nil {
        return hashNode(emptyRoot.Bytes()), nil, nil
    }
    h := newHasher(onleaf)
    defer returnHasherToPool(h)
    return h.hash(t.root, db, true) //为每个节点生成一个未编码的hash
}

hashRoot的核心方法就是 h.hash,它返回了头节点的hash以及每个子节点都带有hash的头节点(Trie.root指向它),大致做了以下几件事:

①:如果我们不存储节点,而只是哈希,则从缓存中获取数据

if hash, dirty := n.cache(); hash != nil {
        if db == nil {
            return hash, n, nil
        }
        if !dirty {
            switch n.(type) {
            case *fullNode, *shortNode:
                return hash, hash, nil
            default:
                return hash, n, nil
            }
        }
    }

②:递归调用h.hashChildren,求出所有的子节点的hash值,再把原有的子节点替换成现在子节点的hash

2.1:如果节点是shortNode

首先把collapsed.Key从Hex Encoding 替换成 Compact Encoding, 然后递归调用hash方法计算子节点的hashcache,从而把子节点替换成了子节点的hash

collapsed, cached := n.copy(), n.copy()
        collapsed.Key = hexToCompact(n.Key)
        cached.Key = common.CopyBytes(n.Key)

        if _, ok := n.Val.(valueNode); !ok {
            collapsed.Val, cached.Val, err = h.hash(n.Val, db, false)
            if err != nil {
                return original, original, err
            }
        }
        return collapsed, cached, nil

2.2:节点是fullNode

遍历每个子节点,把子节点替换成子节点的Hash值,否则的化这个节点没有children。直接返回。

        collapsed, cached := n.copy(), n.copy()

        for i := 0; i < 16; i++ {
            if n.Children[i] != nil {
                collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false)
                if err != nil {
                    return original, original, err
                }
            }
        }
        cached.Children[16] = n.Children[16]
        return collapsed, cached, nil

③:存储节点n的哈希值,如果我们指定了存储层,它会写对应的键/值对

store()方法主要就做了两件事:

  • rlp序列化collapsed节点并将其插入db磁盘中
  • 生成当前节点的hash
  • 将节点哈希插入db

3.1:空数据或者hashNode,则不处理

if _, isHash := n.(hashNode); n == nil || isHash {
        return n, nil
    }

3.2:生成节点的RLP编码

h.tmp.Reset()                                 // 缓存初始化
    if err := rlp.Encode(&h.tmp, n); err != nil { //将当前node序列化
        panic("encode error: " + err.Error())
    }
    if len(h.tmp) < 32 && !force {
        return n, nil // Nodes smaller than 32 bytes are stored inside their parent 编码后的node长度小于32,若force为true,则可确保所有节点都被编码
    }
//长度过大的,则都将被新计算出来的hash取代
    hash, _ := n.cache() //取出当前节点的hash
    if hash == nil {
        hash = h.makeHashNode(h.tmp) //生成哈希node
    }

3.3:将Trie节点合并到中间内存缓存中

hash := common.BytesToHash(hash)
        db.lock.Lock()
        db.insert(hash, h.tmp, n)
        db.lock.Unlock()
        // Track external references from account->storage trie
        //跟踪帐户->存储Trie中的外部引用
        if h.onleaf != nil {
            switch n := n.(type) {
            case *shortNode:
                if child, ok := n.Val.(valueNode); ok {  //指向的是分支节点
                    h.onleaf(child, hash) //用于统计当前节点的信息,比如当前节点有几个子节点,当前有效的节点数
                }
            case *fullNode:
                for i := 0; i < 16; i++ {
                    if child, ok := n.Children[i].(valueNode); ok {
                        h.onleaf(child, hash)
                    }
                }
            }
        }

到此为止将节点写入到Trie的内存数据库就已经完成了。

如果觉得文章不错可以关注公众号:区块链技术栈,详细的所有以太坊源码分析文章内容以及代码资料都在其中。

Trie树缓存机制

Trie树的结构里面有两个参数, 一个是cachegen,一个是cachelimit。这两个参数就是cache控制的参数。 Trie树每一次调用Commit方法,会导致当前的cachegen增加1。

func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) {
   ...
    t.cachegen++
   ...
}

然后在Trie树插入的时候,会把当前的cachegen存放到节点中。

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
            ....
            return true, &shortNode{n.Key, nn, t.newFlag()}, nil
}
func (t *Trie) newFlag() nodeFlag {
    return nodeFlag{dirty: true, gen: t.cachegen}
}

如果 trie.cachegen - node.cachegen > cachelimit,就可以把节点从内存里面拿掉。 也就是说节点经过几次Commit,都没有修改,那么就把节点从内存里面干掉。 只要trie路径上新增或者删除一个节点,整个路径的节点都需要重新实例化,也就是节点中的nodeFlag被初始化了。都需要重新更新到db磁盘。

拿掉节点过程在 hasher.hash方法中, 这个方法是在commit的时候调用。如果方法的canUnload方法调用返回真,那么就拿掉节点,如果只返回了hash节点,而没有返回node节点,这样节点就没有引用,不久就会被gc清除掉。 节点被拿掉之后,会用一个hashNode节点来表示这个节点以及其子节点。 如果后续需要使用,可以通过方法把这个节点加载到内存里面来。

func (h *hasher) hash(n node, db *Database, force bool) (node, node, error) {
    ....
       // 从缓存中卸载节点。它的所有子节点将具有较低或相等的缓存世代号码。
       cacheUnloadCounter.Inc(1)
  ...
}
参考&总结

这部分重要的内容也就上面讲述的,主要集中在Trie上面,如果有不对的地方,可以及时指正哦。

https://mindcarver.cn/about/

https://github.com/blockchainGuide/blockchainguide

热门交易所

相关币种

Ethereum
ETH+8.95%
1,390.57
Money Plant Token
MPT-1.61%
0.000622955

比特币蜘蛛BTCSpider.com是比特币新闻及数字货币信息的聚合站,每天追踪采集全球主流比特币等数字货币财经媒体、项目官网、交易平台的新鲜信息,实时更新海量资讯内容。