hyperledger fabric orderer源码分析
## 简介
数字广告领域当前面临不少痛点问题,如广告欺诈、广告投放环节不透明,行业数据不通等等。而区块链具有不可篡改、可追溯性等特性。如果两者可以结合起来,是否可以解决数字广告领域这些痛点问题?为了探索区块链在数字广告领域的应用,我们结合目前业界比较著名的开源区块链项目hyperledger fabric进行尝试。
### hyperledger简介
Hyperledger 创立于2016年,由30个创始公司会员和一套技术和组织治理机构组成。目前已经发展到超过250多个机构成员。Hyperledger孵化并推广一系列业务区块链技术,包括分布式账本框架,智能合约引擎,客户端库,图形界面,实用程序库和示例应用程序 。以下是超级账本管理的商业区块链框架和工具:
![006tNbRwly1fxl8jvpij9j323d0u0gpn.jpg](https://tech.voiceads.cn/usr/uploads/2019/04/1248132681.jpg)
### hyperledger fabric简介
Hyperledger Fabric是hyperledger中最出名的一个子项目。Fabric定位是面向企业的分布式账本平台,创新的引入了权限管理支持,设计上支持可插拔、可扩展,是首个面向联盟链场景的开源项目。fabric具有模块化架构,因此fabric允许组件(例如共识和成员服务)即插即用。下图展示了fabric的逻辑架构图:
![20160708125650858](https://ws2.sinaimg.cn/large/006tNbRwly1fxl8gk03g2j30ki0cagn7.jpg)
> 引用自《深度探索区块链:hyperledger技术与应用》
>
> #### 成员管理
>
> MSP(Membership Service Providers)对成员管理进行了抽象,每个MSP都会简历一套根新人证书,利用PKI对成员身份进行认证,验证成员用户请求提交的签名。结合Fabric-CA或者第三方CA系统,提供成员注册功能,并对成员身份证书进行管理,例如证书新增和撤销。注册的证书分为注册证书、交易证书、TLS证书,他们分别用于用户身份、交易签名和TLS传输。
>
> #### 共识服务
>
> 在分布式节点环境下,要实现通一个链上不同节点区块的一致性,同时要确保区块里的交易有效和有序。共识机制由3个阶段完成:客户端向背书节点提交提案进行签名背书,客户端将背书后的交易提交给排序节点进行交易排序,生成区块和排序服务,之后广播给记账节点验证后写入本地账本。网络节点的P2P协议采用的是基于Gossip的数据分发,以同一组织为传播范围来同步数据,提升网络效率。
>
> #### 链码服务
>
> 智能合约的实现依赖于安全的执行环境,确保安全的执行过程和用户数据的隔离。Hyperledger fabric采用Docker管理普通链码,提供安全的沙箱环境和镜像文件仓库。其好处是容易支持多种语言的链码,扩展性好。
### orderer简介
orderer主要提供了如上描述的共识服务,`hyperledger fabric 1.0`运行时架构:
![image-20181126114348870](https://ws1.sinaimg.cn/large/006tNbRwly1fxlbfat7foj316u0pm0vv.jpg)
整个过程大致如下:
1. client上的CLI或者SDK经过CA系统认证,连接进入到区块链网络当中。
2. 由client上的CLI或者SDK进行proposal议案的提出。client会依据智能合约chaincode根据背书策略endorse policy决定把proposal发往哪些背书的peer节点,而peer节点进行投票,client汇总各背书节点的结果;
3. client将获得多数同意的议案连同各peer的背书(包括其投票结果以及背书签名)交给orderring service,而orderer会汇总各client递交过来的trasaction交易,排序、打包。
4. orderer将交易打包成区块block,然后通知所有commit peer,各peer各自验证结果,最后将区块block记录到自己的ledger账本中。
5. client上的CLI或者SDK可以通过接口查询上述已经生成的区块。
参考[yeasy](https://github.com/yeasy)的图,非常形象的展示了基于`kafka`的排序核心服务流程:
![orderer_workflow](https://ws2.sinaimg.cn/large/006tNbRwly1fxd57t43bmj30x50p0417.jpg)
本篇基于`hyperledger fabric 1.0`,对orderer的源码进行分析。
## orderer启动过程
1. 初始化log
1. 如设置日志输出格式。
2. 默认日志输出级别,默认为info级别。
3. 设置模块对应的日志输出级别,默认为info级别。
4. 是否开启kafka详细日志,如果开启设置kafka详细日志输出到Stdout。
2. 启动profile服务,启动一个单独的协程,开启端口,提供http profile服务。
3. 初始化grpc,主要过程包含开启tcp端口,加载TLS通信证书
4. 初始化MSP
5. 初始化多通道管理服务,
1. 创建账本工厂,用来管理多账本,有三种类型的账本,file(leveldb)、json、ram,后两种均不建议在生产环境中使用。
2. 初始化配置好的创世区块,从该区块中读取对应的通道名称,将创世区块添加到对应通道的账本当中
3. 查看chains目录下已经存在的账本名称,如果存在,说明不是首次启动(理论上来说是一个常驻服务,可能异常退出等情况),下一步将通过已经存在的文件恢复账本服务。
6. 创建grpc服务,提供两个接口服务,分别是`Broadcast`和`Deliver`
7. 启动grpc服务,对外提供接口服务
## orderer接口分析
### 协议定义
协议定义在`hyperledger/fabric/protos/orderer/ab.proto`文件中
```protobuf
service AtomicBroadcast {
// broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
// deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
}
```
服务描述了两个接口,一个是`Broadcast`接口,另一个是`Deliver`接口。
该服务定义接口采用的是`gRPC`的`stream`模式,因此对于客户端和服务端来说可以流式的收发数据。
### Broadcast接口
`Broadcast`接口是grpc形式的一个服务接口,该接口实现对交易消息的排序处理。
`Broadcast`是一个stream类型的接口,接口的proto定义如下:
```protobuf
rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
```
`Oderer`首先接收到消息,然后消息处理步骤为:
1. `/orderer/server.go`中的`server`的接口`Broadcast(srv ab.AtomicBroadcast_DeliverServer) error`
2. `orderer/common/broadcast/broadcast.go`的`handlerImpl`接口`Handle(srv ab.AtomicBroadcast_DeliverServer) error`
该函数执行过程如下:
1. 接收消息,将消息包进行解析
2. 解析消息包头信息,获取到消息类型
3. 如果是`HeaderType_CONFIG_UPDATE`类型,进入处理配置更新的流程
4. 通过过滤器检查消息是否合格,不合格的将过滤掉
5. 接下来调用对应排序类型(solo或者kafka)接口,将交易进行排序。
6. 发送响应消息
7. 不断循环1-6过程,直到遇到错误退出。
#### 配置更新
如果是`HeaderType_CONFIG_UPDATE`类型的消息,则会将消息经过`bh.sm.Process(msg)`,实际是调用`orderer/configupdate/configupdate.go`中的`Process`对消息进行进一步的加工,将消息加工成一个orderer可以处理的交易消息:
1. 创建新通道的配置
2. 对已存在的通道更新配置
#### 服务类型
查看`broadcast.Support`,主要实现了两个接口:
```go
// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
}
```
接下来主要分析`solo`和`kafka`中的实现。
##### solo
代码在`orderer/solo/consensus.go`中,`main`函数。
###### 主要流程
主要流程为:新消息处理,同时检查是否出块;超时检查是否进行出块。
一直不断的循环,接收三种消息:
1. 新交易消息
2. 定时器消息
3. 退出消息
下面详细分析1和2两个处理流程。
###### 新交易消息处理流程
接收到一个消息时,判断是否是该batches接收第一条消息。是的话需要添加一个定时器。
可以这么理解:为每个batches添加一个定时器,在该batches接收第一条消息是添加。当该batches被写入区块时,将该定时器销毁。
这里有个问题:
1. 设置定时器的超时时间是多少?
2. batches怎么生成,消息接收的详细流程
###### 定时器消息处理流程
定时器一到,调用Cut,判断是否存在batch,如果不存在结束本次循环,如果存在batch,创建区块。
这里也有一个疑问:Cut函数详细的处理过程?
###### receiver实现
`orderer/common/blockcutter/blockcutter.go`中的`receiver`实现了`Receiver`的两个接口。
```go
type Receiver interface {
Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, committers [][]filter.Committer, validTx bool, pending bool)
Cut() ([]*cb.Envelope, []filter.Committer)
}
```
下面我们主要分析`Ordered`接口,对于一个消息来说,可能会触发以下三种操作:
1. 当前消息正常,不需要cut。
2. 当前消息正常,需要cut。
3. 消息不正常。
什么时候需要cut?
1. 当前消息的大小大于配置的`PreferredMaxBytes`,处理结束。
2. 未cut的消息加当前消息大小大于`PreferredMaxBytes`。
3. 在2的处理之后,再判断未cut的消息加当前消息大小大于`MaxMessageCount`。
##### kafka
对应的实现在`orderer/kafka/chain.go`中。
对于一个通道而言,如果采用kafka,为每个通道只创建一个Partition,这样对于所有分布式节点而言,所有的序列都是一致的。
`Equeue`接口从客户端接收消息,作为生产者,将消息写入到kafka当中;另一方面启动时,单独启动一个goroutine用来处理消费消息。主要处理三种类型的消息消息。
第一类是异常类消息,如:
1. 退出消息,执行退出。
2. 通道消费异常,重新连接kafka。
第二类是kafka消息,对于从kafka中消费的消息,有三种类型:
1. `KafkaMessage_Connect`,连接信息,忽略不需要处理。
2. `KafkaMessage_TimeToCut`,定时器消息,超时出块,如果满足要求,将进行出块。
3. `KafkaMessage_Regular`,常规交易消息,和`solo`中的过程非常类似,调用`receiver`的`Ordered`接口,成功返回之后,判断是否需要设置定时器;接下来对`Ordered`接口返回的`batches`进行出块。
第三类是定时器消息,定时器在设定的时间内触发时,会发送一条`KafkaMessage_TimeToCut`类型的消息到`kafka`当中。
##### PBFT
在`hyperledger fabric 1.0`版本中,暂时还未实现。
### Deliver接口
`diliver`接口是grpc形式的一个服务接口,该接口实现了区块的同步,即将区块信息发送给请求端。
而同步这个过程是响应的client来主动获取的,也就是说是pull的方式,而不是push主动通知。
`Deliver`是一个stream类型的接口,接口的proto定义如下:
```protobuf
rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
```
`Oderer`首先接收到消息,然后消息处理步骤为
1. `/orderer/server.go`中的`server`的接口`Deliver(srv ab.AtomicBroadcast_DeliverServer) error`
2. `orderer/common/deliver/deliver.go`的`deliverServer`接口`Handle(srv ab.AtomicBroadcast_DeliverServer) error`
最终对应的实现在`orderer/common/diliver/diliver`中的`deliverServer`对应的`Handle`函数中。
该函数大致步骤如下:
1. 接收消息,将消息包进行解析。
2. 解析消息包头部信息,获取到channelid。
3. 从消息体中解析获取区块的位置,检查其合法性。
4. 读取发送区块信息,并且将区块发送给请求端。
5. 处理发送区块成功之后,最后再发送成功消息。
请求端请求的对应的区块结束位置,有三种类型
- `SeekPosition_Oldest`
- 截止到最早的区块,应该就是从起始开始第一块
- `SeekPosition_Newest`
- 截止到最新区块
- `SeekPosition_Specified`
- 截止到获取到指定区块
接下来主动不断循环迭代区块,并且将区块发送给请求端,直到以上获取的截止位置。
## 调试orderer
可以直接到`hyperledger/fabric/orderer`目录下执行`go build`编译生成orderer的二进制文件。并且可以启动运行它。可以`fabric/orderer/sample_clients`使用其中包含的两个测试程序来测试。
本期我们对orderer有了一个基本的认识,接下来为了更加直观的认识orderer,将对orderer进行部署调试。
## 参考资料
1. [Hyperledger Fabric Ordering Service](https://github.com/hyperledger/fabric/blob/release-1.0/orderer/README.md)
2. [排序服务核心原理和工作过程](https://github.com/yeasy/hyperledger_code_fabric/blob/master/process/orderer_workflow.md)
3. [深度探索区块链:Hyperledger技术与应用](https://book.douban.com/subject/30127741/ )
4. [区块链开源实现HYPERLEDGER FABRIC架构详解](https://www.taohui.pub/2018/05/26/%E5%8C%BA%E5%9D%97%E9%93%BE%E5%BC%80%E6%BA%90%E5%AE%9E%E7%8E%B0hyperledger-fabric%E6%9E%B6%E6%9E%84%E8%AF%A6%E8%A7%A3/)