庄周梦蝶

生活、程序、未来

昨天、今天和明天

| Comments

这题目非常俗套,不过同名的本山大叔的小品我却是很喜欢,哪怕很多人说有歧视的嫌疑。

其实这篇博客就是吹吹水,总结下我过去 5 年做了哪些微小的工作,为什么加入蚂蚁金服以及加入蚂蚁金服准备做什么。

2012 年我从淘宝离职加入了 AVOS 中国公司,很多人看来不理解,为什么放着到手可得的股票而去加入一家前途未卜的创业公司?其实很简单,当时我在淘宝呆了三年,一直做消息中间件相关的工作,先是 Notify(蚂蚁这边叫 msgbroker ),后来是发现了 kafka,所以有了 MetaQ。。以当时的眼光看,MetaQ 很多东西都是更强大,功能更全面,比如高可靠的复制方案、事务消息乃至支持 XA 协议的分布式事务。但是做久了,确实腻歪,然后去转岗,最后半年去做 TAE(Taobao AppEngine)项目,第一次接触和引入了 Storm,当然,更重要的是结识了 Clojure 这门语言。顺其自然,国内玩 clojure 的人这么少,就认识了现在 LeanCloud 的 CEO 江宏,在杭州咖啡馆聊了一次,没过多久,我就决定加入了。所以说原因很简单,一是我想换个环境,当时的眼界和心境决定了我没法继续在阿里呆下去,我想离开工作了三年的熟悉环境,换一门新语言,换一个完全不同的工作环境,认识不同的人,尝试下创业是什么感受。二是江博士的个人影响力,我认同他的许多观点,也庆幸能参与到他组建的这个优秀团队。

如果以这些原因来看,我的目的完全达到了, AVOS 中国办公室当时是一个非常牛逼的团队,我认识了很多远比我优秀的工程师,无论是国内的,还是国外的,有一些现在还保持着很好的联系。我经历了数个产品的从生到死,并且全程参与到最终 LeanCloud 这三年来的成长和发展。这 5 年来我也基本一直在用 clojure 写代码,不敢说完全精通,但是至少可以吹牛逼地说,我是国内少数的几个有丰富 clojure 实践经验的工程师。更重要的是这 5 年来经历的一切,对我来说是完全全新的体验。创业非常艰难,同样,创业也非常激动人心。

AVOS 中国团队经历了好几个产品,美味书签开始,我们先是想尝试做 delicous 的中国版本,加上社交和设计类的元素。但是总所周知, delicous 已经死了。我这个阶段还是主要在后端,做一些文本抽取算法、后端服务之类的工作,也帮助 delicious 基于 Solr Cloud 做了新的搜索系统,解决数亿文档的搜索问题。在美味书签失败后,类似无觅这样的基于兴趣 tag 订阅的阅读类开始兴起(其实今日头条也是),我的同事孙宁开始主导美味爱读的开发,基本是参照着 Prismatic 来做,我又参与进去,记得是做后端 API 和分类算法,第一次接触了聚类推荐算法相关的东西。美味爱读真的是非常好的产品,可惜他的原型 Prismatic 也死了,不过他们留下很多美妙的 clojure 库,比如 schema。在美味爱读的开发的同时,另一拨同事做了玩拍这个短视频项目,因为当时短视频 Vine 非常火,我们“又”赶热潮去做了玩拍。玩拍的做的还是算是比较成功,火过一阵子,但是很可惜,仍然是没有坚持下去,加上没有持续的内容运营,更重要的是团队上大变动,反而当时对标的秒拍坚持到了现在。玩拍第一个版本的后端 API 是我设计和实现,后续因为参与美味爱读和 Delicious 重构,就退出了这个项目。然后同事朱老板一直很关注 parse.com,认为移动大潮下,这个东西还是有价值的,说要不我们做一个试试。LeanCloud 就这样横空出世。很夸张,但是却是现实, AVOS 中国公司的末期有三个产品在齐头并进。

总结前面这几个产品,本质上的问题还是整个团队没有一个明确的方向,像无头苍蝇一样乱串,但是这帮“苍蝇”很牛逼,做的产品还可以,可惜最终由于公司变动、团队短板上的问题,没有坚持下去,一些关键点的抉择都有值得复盘总结的地方。

后来就是 AVOS 美国那边也尝试了几个产品失败,基本确定要从中国撤离。留给个人的就两个选项,留下继续尝试刚开始做的 LeanCloud,或者离职。回想起来,我当时没有多少犹豫,因为 LeanCloud 早期的后端其实就我和另一个同事广成,我负责存储、网站、推送乃至云引擎(当时还叫云代码)的研发,广成负责实现一个友盟统计。然后 SDK 就一个朱老板,再加上前端长龙和一个设计MM,我们几个个人就这么开工了。LeanCloud 的早期就是这样,每个人负责一个大摊子,拼命地干活,我后来说这个阶段是我工作中的第二个高峰,印象中还记得北京街头凌晨的昏黄路灯 :D。

项目初步做完,上线,没有多少人用,因为 bug 多,功能也不稳定。这里说个趣事,一个外包小团队开始用我们,给央视的在人民大会堂的广告推介会做过一个抽奖环节,为了做现场技术支持,这是我第一次进入人民大会堂的宴会大厅,估计也是最后一次。因为团队很小,开发、测试、因为都要做。就算到现在, LeanCloud 从来都没有过一个专职的测试人员,所有的代码都是开发人员自己完成从研发到测试,乃至文档的所有工作。还有很长一段时间,开发同时还是客服,你可以想象这个压力和负担。当时 [fir.im] 的 CEO 王猛加入过我们一阵子,他也是我认识的人里少数佩服的几个同时有极强的编码能力和产品嗅觉,可惜最终由于他拿到了投资,自己创业去了。LeanCloud 的每个功能模块的开发人员都少而精干,这一点有利也有弊。但是整个团队真的非常棒,这是我呆过最好的团队,战斗力非常强,每个人都专业、努力,感恩、感谢。

慢慢地,推送和云引擎都有了专门的同事负责,孙宁和郭瑞负责推送聊天,原来淘宝的同事陈伟负责云引擎,他们都做的更棒更强大。而我都专注在存储后端和整个网站控制台服务上,以及一些杂七杂八的项目的研发和部分管理工作上。有一段时间,存储的故障频发,主要还是 Mongodb 的各种坑,一路趟着过来,从头梳理了所有服务,从 SDK 到后端各个服务层层隔离、流控、监控和告警,最终慢慢稳定下来。我们也从小作坊式的部署,迁移到了 mesos + marathon 的容器部署架构上,做到了一键部署和伸缩容量。为了推出腾讯云(现已关闭)和美国节点,设计了跨站点异地机房的架构,顺便支持了专有云的部署……整个阶段就是这么一个升级打怪的过程,碰到问题,解决问题,然后碰到新的问题。

一直到了今年,产品仍然有很多可以改善的地方,但是我觉的自己的作用已经到此为止了。我的能力无法再让我负责的东西再上一个台阶,而公司招人上也一直不尽如人意。移动的大潮已经过去,物联网的大潮还未来临,但是我一直认为 LeanCloud 仍然有许多值得努力的方向,比如物联网,比如容器云计算(基于云引擎平台),甚至做自己的文件存储 CDN 等等,不过我的使命已经完成。我现在的老大说我适合做从 0 到 1,而从 1 到 1.x 乃至 2.x ,却不是我所擅长和足以坚持的。回头想想这个过程,他的判断很到位。

离开 LeanCloud 加入蚂蚁金服的原因也不仅仅如此,它不是另一个疲倦期的结束,而是一些现实的压力,经济上的,个人方向上的。经济上,我需要一些收入,个人方向,我想再去做基础性的产品。记得当年初入编程这个行当,给自己立过一个看起来可笑的志向:编写出世界级的软件。十年过去,似乎离这个目标还很遥远。甚至什么是世界级软件的定义,也没有想的很明白。

来了蚂蚁,又回到一个忙碌的节奏,每天排的满满的读书和工作。这一段时间内还是做老本行——消息中间件,做各种改进和功能扩展,入职两个月来也算是干了点事情,突破了原来 RocketMQ 的一些瓶颈,并且尝试扩展 MQ 的功能范围,在这方面 kafka 是我们的榜样。后面,我也不排除去做一些其他事情,只要有趣有挑战,别人也需要,我都愿意去尝试下。其次,希望自己的能力下沉,突破原有的边界,深入到自己过去不敢或者不愿意了解的领域,看能不能做出一点新东西。

二进宫阿里,从心态上,放的很平,也更能放的开,这可能都是年纪的功劳。我就是来干活的,同时要赚点钱的,既然还能折腾,那就继续折腾。也欢迎有兴趣的朋友,跟我一起折腾,想到蚂蚁金服中间件的可以联系我邮箱 killme2008@gmail.com

自旋锁的优化

| Comments

自旋锁的提出主要也是为了解决多核系统的同步问题。当锁要保护的代码块执行很快的时候,并且争抢不是非常激烈的时候,自旋锁的比之重量级的需要切换上下文的互斥锁能带来更好的性能表现。对于非多核系统,用户态的自旋锁空耗 CPU,反而降低了整个系统的 progress,作用反而不大。

此外,当保护的代码块执行较为耗时,或者自旋锁的争抢非常激烈的时候,自旋锁本身就消耗了大量无谓的 CPU ,这种情况下还不如使用互斥锁,让出 CPU 给任务执行,提高实际的 CPU 利用率。

我在前面一篇介绍 CPU 高速缓存的博客,举了个例子用 AtomicInteger 的 compareAndSet 实现一个自旋锁,主要是为了演示在 SMP 下通过增加一个内循环,来减少锁状态 state 在多个 CPU 之间的『颠簸』。

但是其实这个例子如果用 synchronized 或者 ReentrantLock 改写,都会快得多。比如换成 ReentrantLock,

1
2
3
4
5
6
7
8
9
10
private ReentrantLock lock =  new ReentrantLock();

/**
 * 递增 count,使用 ReentrantLock 保护 count
 */
public void incr() {
    lock.lock();
    count = count + 1;
    lock.unlock();
}

运行时间下降到了 150~230 毫秒,比之原来测试的两个版本快了一个数量级。

原因就在于递增 +1 这个操作非常快,导致自旋锁版本的代码线程争抢锁非常激烈,大家抢的头破血流,但是任务却没有多大进展,空耗了大量 CPU。就好像一堆人抢着去上卫生间,大家都不排队,你挤我抢,反而堵在了门口,卫生间的实际利用率下降了。从这个生活中的例子出发,我们可以想到一些优化的方法,比如分散每个人的争抢时间,比如排队。

因此,自旋锁的优化有一些思路:

首先是退让(Back off),每个线程在争抢锁之前等待一小会,通常第一次不退让,但是之后就按照某个规则变化这个退让时间,我们修改下原来的 BusyLock 例子来演示这个优化,这里采用指数退让:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 //最大退让时间 10 毫秒
 private static final int max_backoff_msg=10;

     /**
     * 利用 CAS 实现自旋锁
     */
    private void lock() {
        //其实退让时间 1 毫秒。
        int backoff=1;
        while (!state.compareAndSet(0, 1)){
            while (state.get() == 1)
                ;
            //不是第一次,退让一下
            if(backoff>1){
                try{
                    Thread.sleep(backoff);
                }catch(Exception InterruptedException){
                    Thread.currentThread().interrupt();
                }
            }
            //还没有超过最大退让时间,指数增加退让时间。
            if(backoff<max_backoff_msg){
                backoff*=2;
            }
        }
    }

通过引入一个退让机制,我们重新跑下原来的测试,整体执行时间下降到了 120~200 毫秒了,这就跟 ReentrantLock 差不了太多了。退让时间可以是静态不变的,或者动态变化的,动态变化可以是这里的指数递增,或者随机生成。不同的策略也会带来不同的影响。

但是呢,这个实现是否比 ReentrantLock 的实现更好? 肯定不是。首先没有处理可重入锁,也不支持 tryLock 超时、取消等;其次,用的 sleep 做退让,最小单位是毫秒,并且 sleep 精度无法保证。另外,没有任何公平性可言,等待最久的线程可能退让最多,新来的很可能先抢到锁等等。

这就引出了另一个优化思路:排队(Queuing Lock)。这其实就是 ReentrantLock 的实现方式。ReentrantLock 有两种模式:公平和非公平,通常情况都推荐你使用非公平版本,也就是默认的。对于非公平版本的 lock 实现如下:

1
2
3
4
5
6
7
8
9
    * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

先尝试用 CAS 获取锁,如果成功了,那就设置独占线程为自己,如果失败,进入 AQS 框架的 aquire 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

aquire 里同样先尝试下能不能获取,不行就去调用 addWaiteracquireQueued 去尝试排队,如果还是不行,就中断取消。

排队过程也是一个 lock-free 的循环过程。acquireQueued 仍然会去自旋尝试获取自旋锁,如果仍然继续失败,就调用 park 让出 CPU 不再参与调度,等锁被释放的时候被前驱节点的线程(释放锁的线程)唤醒再次参与争抢。最后,通过引入队列禁止抢占,也可以支持严格的公平锁了。

关于 AQS 和 ReentrantLock 源码解析,网上资料多如牛毛,这里也不重复了。

排队模式的自旋锁目前有几种:基于票据(ticket),每个线程分配一个票据,轮到自己就获得锁;MCS 锁CLH Lock,都是基于链表, AQS 框架从一定程度上是 CLH 的变种实现。

关于更多自旋锁的优化请参考 Spin Lock Performance,更仔细的设计需要将硬件的多核模型考虑进入。

题外话:单纯看一个东西源码,只能了解一个实现是什么样的,但是不知道为什么这样实现,有时候需要换个角度去思考,并且阅读一些相关方面的资料,才能理解作者为何如此设计等等。

分布式一致性论文阅读阶段性小结

| Comments

这个月阅读集中在分布式一致性和存储方面。

首先是 Paxos 系列论文:

  • 《Paxos Made Simple》,循循渐进地讲解 paxos 解决的问题、逐步增强的约束条件(P1、P2、P2a – P2c)等,P1 保证至少有一个值被接受, P2 保证只有一个被选中的值被所有 process 接受。然后介绍两阶段的步骤:

    • Phase 1.
      • (a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors. * (b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro- posal (if any) that it has accepted.
    • Phase 2.
      • (a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v, where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals. * (b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

加粗的两句话就是 paxos 的核心,一个是 acceptor 的承诺,一个是 proposer 的选择。达成一致的值之后,接下来论文讨论了其他 process 学习机制和 liveness 问题,paxos 无法解决活锁问题,但是实际应用中这个问题可以通过选举一个唯一的 proposer 来避免。接下来谈到实现,说明一个 Replication State Machine 的实现问题,着重讨论了 leader failure 情况下的『空洞』问题,通过引入 no-op commands 来减少服务中断时间。但是这篇论文没有明确提到 multi paxos 的介绍,这就需要继续阅读其他论文和博客来理解。

  • 《Paxos Made live》。这篇论文是 Google 发表的,讨论了 paxos 的工程实践,也就是 chubby 这个众所周知的分布式服务的实现,可以结合《The Chubby lock service for loosely-coupled distributed systems》 一起看。实际应用中的难点,比如 master 租约实现、group membership 变化、Snapshot 加快复制和恢复以及实际应用中遇到的故障、测试等问题,特别是最后的测试部分。非常值得一读。《The Chubby lock service for loosely-coupled distributed systems》 更多介绍了 Chubby 服务本身的设计决策,为什么是分布式锁服务,为什么是粗粒度的锁,为什么是目录文件模式,事件通知、多机房部署以及应用碰到的使用问题等等。

  • Paxos Made Practical。这篇论文更详细地讨论了 State Machine 的实现,甚至还带上了 C语言的伪代码,定义了 prosper 、 acceptor 以及 SM 本身需要实现的接口和通讯协议,更重要的是讨论了 membership change 的问题,通过引入 view 视图的概念,介绍了 view-change 协议来解决成员变动问题(可能是故障或者上下线新成员),按我的理解,这个过程也是 paxos 的应用。最后介绍了可能的优化手段。

view change 协议 (view change 协议)

  • Paxos 我还着重推荐阅读微信后端团队写的系列博客,包括他们开源的 phxpaxos 实现,基本上将所有问题都讨论到了,并且通俗易懂。

其次,一致性方面另一块就是 Raft 算法,按照 Google Chubby 论文里的说法,

1
2
Indeed, all working protocols for asynchronous consensus we have so far 
encountered have Paxos at their core.

但是 Raft 真的好理解多了,我读的是《In Search of an Understandable Consensus Algorithm》,论文写到这么详细的步骤,你不想理解都难。毕竟 Raft 号称就是一个 Understandable Consensus Algorithm。无论从任何角度,都推荐阅读这一篇论文。

首先能理解 paxos 的一些难点,其次是了解 Raft 的实现,加深对 Etcd 等系统的理解。这篇论文还有一个 250 多页的加强版《CONSENSUS: BRIDGING THEORY AND PRACTICE》,教你一行一行写出一个 Raft 实现,我还没有学习,有兴趣可以自行了解。Raft 通过明确引入 leader(其实 multi paxos 引申出来也有,但是没有这么明确的表述)来负责 client 交互和日志复制,将整个算法过程非常清晰地表达出来。Raft 的算法正确性的核心在于保证 Leader Completeness ,选举算法选出来的 leader 一定是包含了所有 committed entries 的,这是因为所有 committed entries 一定会在多数派至少一个成员里存在,所以设计的选举算法一定能选出来这么一个成员作为 leader。多数派 accept 应该说是一致性算法正确性的最重要的保证。

最后,我还读了《Building Consistent Transactions with Inconsistent Replication》,包括作者的演讲,作者也开放了源码。Google Spanner 基本是将 paxos 算法应用到了极致,但是毕竟不是所有公司都是这么财大气粗搞得起 TrueTime API,架得起全球机房,控制或者承受得了事务延时。这篇论文提出了另一个思路,论文介绍的算法分为两个层次: IR 和基于其他上的 TAPIR。前者就是 Inconsistent Replication,它将操作分为两类:

  • inconsistent: 可以任意顺序执行,成功执行的操作将持久化,支持 failover。
  • consensus:同样可以任意顺序执行并持久化 failover,但是会返回一个唯一的一致(consensus)结果。

IR 的调用图:

可见他需要服务端和客户端共同参与,对于 consensus 操作,如果 replicas 之间有冲突,会在客户端引入一个 decide 过程来决定使用哪一个值,相应地,在服务端为了解决 master 和 replicas 的不一致问题,引入了 sync/merge 过程来解决冲突,master 运行 merge 过程来解决 consensus 操作的副本冲突,而所有节点运行 sync 过程来同步 master 记录。关于 sync/master 的描述看原文:

1
2
3
4
5
6
7
8
9
10
11
12
"Some replicas may miss operations or need to reconcile their state if the 
consensus result chosen by the application (i.e., transaction) protocol does 
not match their result. To ensure that IR replicas eventually converge, they 
periodically synchronize. Similar to eventual consistency, IR relies on the 
application (i.e., transaction) protocol to reconcile inconsistent replicas.
 On synchronization, a single IR node first upcalls into the application 
 protocol with Merge, which takes records from inconsistent replicas and 
 merges them into a master record of successful operations and consensus 
 results. Then, IR upcalls into the application (i.e., transaction) protocol 
 with Sync at each replica. Sync takes the master record and reconciles 
 application (i.e., transaction) protocol state to make the replica consistent 
 with the chosen consensus results."

为了保证正确性,IR 对上层应用层协议有特殊的要求:

  • Invariant checks must be performed pairwise.也就是要求任意两个 consensus 操作,其中一个至少对另一个是可见的。不然无法检测是否冲突。
  • Application protocols must be able to change consensus operation results.对于已经达成一致的结果,还要允许是可以被修改的,merge 过程会修改原来认为的一致的结果,这是不一致复制必然带来的问题。
  • 性能原则 1:Application protocols should not expect operations to execute in the same order. 对于顺序不要有任何假设。
  • 性能原则 2:Application protocols should use cheaper inconsistent operations whenever possible rather than consensus operations. 尽量用 inconsistent 操作。比如在 TAPIR 里只有 prepare 是 consensus 类型操作。

正因为对于应用层协议有这么多的限制,因此论文提出了 TAPIR 这个算法来解决事务的 linearizable ordering 问题。TAPIR 的具体算法请阅读论文吧,这里不再复述。大体的思路就是客户端参与事务的冲突检测(OCC validation checks),Leader 执行IR 的 merge 过程,对于还没有committed 的事务(可能 abort ,也可能来不及提交),重新跑一遍 OCC 检测冲突,根据结果来决定最终是提交还是回滚。

对于复制和恢复的描述:

1
2
3
4
5
TAPIR’s sync function runs at the other replicas to reconcile TAPIR state with
 the master records, correcting missed operations or consensus results where 
 the replica did not agree with the group. It simply applies operations and
  consensus results to the replica’s state: it logs aborts and commits, and 
  prepares uncommitted transactions where the group responded PREPARE-OK.

传统两阶段提交, Google spanner 之类的思路:

TAPIR 的流程:

关于 TAPIR 的解读推荐两篇博客:Building Consistent Transactions with Inconsistent ReplicationPaper review: Building Consistent Transactions with Inconsistent Replication (SOSP’15)。 TAPIR 的源码只包含了 normal case 的处理,恢复之类的过程都是没有的,对于 recovery 的一些疑问,可以参考 A FEW WORDS ABOUT INCONSISTENT REPLICATION (IR),同样也是我的疑问,这在实际工程中是非常重要的部分,但是论文却是匆匆带过。

最近还回顾了不少分布式存储的论文, GFS、BigTable、Dynamo 等,改天再做个总结吧。

Join Idle Queue 负载均衡算法解析

| Comments

JIQ 是微软发的一篇论文《Join-Idle-Queue: A Novel Load Balancing Algorithm for Dynamically Scalable Web Services》里描述的负载均衡算法,这里总结下我所理解的内容。

背景

负载均衡很常见,比如我们经常用 nginx 做反向代理和负载均衡,Nginx 也支持了 weight、ip_hash、url_hash 等均衡算法。

负载均衡的图示:

任务 jobs 不停地经由多个 dispatcher 转发给后面的 processor server 处理。

dispatcher 选择哪一条 processor 来转发任务的过程就是 load balance 的核心问题。尽量降低任务的响应时间是我们的目标。

最简单的算法可能是随机或者轮询,但是这种简单的策略会造成响应时间的最大化,特别是高负载的情况下。

更优化的策略有:

  • JSQ: Join-the-Shortest-Queue,每次将任务加入最少任务队列的 server。这就要求 dispatcher 收集每个 processor server 的任务队列大小信息,但是随着 dispatcher 本身的集群化以及云服务厂商的大规模应用,这个收集产生的网络通讯就更加膨胀了。
  • SQ(d)(Power-of-d):每当任务到达的时候, dispatcher 就随机取样 d 个 processor 服务的队列大小,选择最小任务队列的那个派发。通常 d 选择为 2。这个算法相比随机算法能带来响应时间指数级别的提升。但是仍然需要在分发任务的时候获取 processor 队列信息,这个同步调用在任务派发的关键路径上,对性能有很大影响。
  • 工作窃取和共享:工作窃取就是空闲队列主动去从其他任务队列『窃取』任务,或者繁忙的队列主动将任务『推送』给其他空闲队列。这个算法更适合共享内存的系统,对于 web 负载均衡,在不同后端 server 之间做任务的窃取或者推送会带来额外的开销和复杂度,想象一个 web http 请求如何转交到另一台后端 server,涉及 TCP 链接的迁移和请求的同步等等。

JIQ 全称就是 Join-Idle-Queue,它的提出就是为了解决大规模 Web Services 的负载均衡问题。

算法描述

JIQ 顾名思义就是将任务加入空闲的队列。空闲队列的数据结构称之为 I-Queue(idle queue)。JIQ 将算法描述为两部分:primary load balancing 和 secondary load balancing,两者之间就通过 I-Queue 数据结构通讯。

  • 首要负载均衡(primary load balancing):每当一个任务到达, Dispatcher 就检查 I-Queue 是否为空。如果不为空,就移除第一个空闲的 processor,将任务交给它。如果为空,就采用随机选择的一个 processor 派发任务。
  • 次级负载均衡(secondary load balancing):是指空闲的 processor 选择加入哪一个 I-Queue 的过程。这里讨论了两种策略,其实是前面讨论的负载均衡策略的翻版:
    • JIQ-Random,也就是随机策略,Idle Processor 随机选择一个 I-Queue 来加入。
    • JIQ-SQ(d),Idle Processor 随机取样 d 个 I-Queue,选择最小长度的来加入。

一张图来描述就是这样:

JIQ 的好处很明显:

  • 在派发任务的关键路径上移除了 Dispatcher 和 Processor 同步通讯的开销,通过 I-Queue 这个队列结构异步解耦了。
  • 同时可以证明 JIQ 带来的复杂度没有超过 SQ(d) 算法(有数学证明),并且在响应时间的降低上极大地超过了 SQ(d) 算法。

论文里给的一张测试结果,在7 种不同的响应时间分布模型上做对比,比较 SQ(d) 和 JIQ 的响应时间(越小越好)消耗:

PS 和 FIFO 是指任务的执行策略,PS 是 Process-Sharing 策略,来一个跑一个,而 FIFO 就是先入先出策略。R 和 S 开头分别表示采用的是 JIQ-Random 还是 JIQ-SQ(d) 的次级负载均衡策略,后面的数字表示 Processor 和 I-Queue 的比例, 比如 R40 就是采用 JIQ-Random 次级负载均衡策略,同时 processor 数量是 I-Queue 数量的 40 倍。

从结果来看,JIQ 比 SQ(d) 算法在降低响应时间上的结果非常好。其次是 JIQ-SQ(d) 比 JIQ-Random 的总体表现也更好。

论文里有相当多的数学计算来分析整个模型,这一块基本没看懂,有兴趣的地自行研读。

算法扩展

当在非常高负载(At very high load)的情况下,processor 可以在『轻』负载的时候就报告给 I-Queue 说自己现在是空闲状态。比如,processor 当自己的任务队列只有一个任务的时候,将自己加入 I-Queue,在完成任务后再加入一次。这样就可以在 I-Queue 里增加了一份 processor 的拷贝,变相地增加了任务到 Idle Processor 的到达率。

当报告的阈值设置为 2,性能的对比如下, JIQ 仍然表现很好:

Clojure Under a Microscope(1):Clojure 如何理解代码(下)

| Comments

继续上篇,本篇的目的是将 parse 过程介绍完成,然后我们就可以进入编译和运行环节。

目录:

LispReader 补充

上篇在介绍 LispReader 源码核心片段的时候没有介绍最后一个比较关键的代码片段:

1
2
      String token = readToken(r, (char) ch);
      return interpretToken(token);

interpretToken 方法将去解析字符串 token 的含义,token 就是一个词汇单元,它的含义是什么,完全由 interpretToken 决定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static private Object interpretToken(String s) {
  if(s.equals("nil"))
      {
      return null;
      }
  else if(s.equals("true"))
      {
      return RT.T;
      }
  else if(s.equals("false"))
      {
      return RT.F;
      }
  Object ret = null;

  ret = matchSymbol(s);
  if(ret != null)
      return ret;

  throw Util.runtimeException("Invalid token: " + s);
}

代码其实很清楚,token 可能是:

  • nil
  • true
  • false
  • symbol

symbol 再次说明下,你可以暂时理解为 Java 的变量 identifier,指代某个值。

P.S. LispReader 用了相当多的相当复杂的正则表达式来匹配整数、symbol、分数等,这一块也许有性能优化的空间。

Analyze

我们在上一篇提到, LispReader 它只负责解析出一个可以被求值的合法的 form,进行一些初步的语法规则检查,至于这个 form 更进一步是否符合语义,它并不关心。

比如前文举例了 (read-string "(if true 1 2 3)"),它可以被 LispReader 读出来,但是其实是不符合 if 这个 special form 的要求的,因为它只接受两个或者三个参数。

更进一步的检查是放在 clojure.lang.Compiler 中的,这个类也是 Clojure 代码编译成 JVM 所理解的字节码并交由 JVM 运行的核心。

具体地说,是 Compiler 类中的 analyze 方法,它会分析 LispReader 读出来的 form,并转化可以求值的 Expr。如图:

Expr 是一个 Java 接口,它有各种各样的子类,接口本身寥寥几个方法:

1
2
3
4
5
6
7
8
9
interface Expr{
  Object eval() ;

  void emit(C context, ObjExpr objx, GeneratorAdapter gen);

  boolean hasJavaClass() ;

  Class getJavaClass() ;
}

其中:

  • eval 用于对自身求值,得出表达式的结果。
  • emit用于编译,这个编译特指 Clojure 的编译,它会生成 Java 的 class 文件。
  • hasJavaClassgetJavaClass 是为了编译环节做优化引入的,暂时不聊。

关于 evalemit 我们放到下一篇文章的编译再讲。这里还是聚焦在 Compiler 如何 analyze 出 Expr。

analyze 的核心仍然是一个 50 多行的 if else 派发,它会检查传入的 form 的类型,然后根据类型进入更具体的 analyze 方法,比如 (if true 1 2 3) 这是一个 Sequence,那么会进入 analyzeSeq 方法:

1
2
3
4
......
else if(form instanceof ISeq)
              return analyzeSeq(context, (ISeq) form, name);
......             

analyzeSeq 中,它会先判断 form 的第一个元素(first)是否是 fn ,也就是一个函数,如果不是,那么会去检查下是否是 specail form,这里也是一个查表法:

1
2
3
4
5
6
7
8
9
   Object op = RT.first(form);

   IParser p;
  if(op.equals(FN))
      return FnExpr.parse(context, form, name);
  else if((p = (IParser) specials.valAt(op)) != null)
      return p.parse(context, form);
  else
      return InvokeExpr.parse(context, form);

最后,如果不是 fn ,也不是 special forms,那么可能是一个调用(比如函数调用,宏调用,Java method 调用等),那就进入 InvokeExprparse 过程。

special forms 都定义在 specials map 里了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final public IPersistentMap specials = PersistentHashMap.create(
      DEF, new DefExpr.Parser(),
      LOOP, new LetExpr.Parser(),
      RECUR, new RecurExpr.Parser(),
      IF, new IfExpr.Parser(),
      CASE, new CaseExpr.Parser(),
      LET, new LetExpr.Parser(),
      LETFN, new LetFnExpr.Parser(),
      DO, new BodyExpr.Parser(),
      FN, null,
      QUOTE, new ConstantExpr.Parser(),
      THE_VAR, new TheVarExpr.Parser(),
      IMPORT, new ImportExpr.Parser(),
      DOT, new HostExpr.Parser(),
      ASSIGN, new AssignExpr.Parser(),
      DEFTYPE, new NewInstanceExpr.DeftypeParser(),
      REIFY, new NewInstanceExpr.ReifyParser(),
//       TRY_FINALLY, new TryFinallyExpr.Parser(),
TRY, new TryExpr.Parser(),
THROW, new ThrowExpr.Parser(),
MONITOR_ENTER, new MonitorEnterExpr.Parser(),
MONITOR_EXIT, new MonitorExitExpr.Parser(),
//       INSTANCE, new InstanceExpr.Parser(),
//       IDENTICAL, new IdenticalExpr.Parser(),
//THISFN, null,
CATCH, null,
FINALLY, null,
//       CLASS, new ClassExpr.Parser(),
NEW, new NewExpr.Parser(),
//       UNQUOTE, null,
//       UNQUOTE_SPLICING, null,
//       SYNTAX_QUOTE, null,
_AMP_, null
);

可以看到核心的 special forms 其实是非常少的,所以我一直不理解被人说 clojure 语法复杂。其实它的核心语法就这么 20 几个 special forms。

每个 special form 都对应一个 IParser 的子类:

1
2
3
interface IParser{
  Expr parse(C context, Object form) ;
}

实现其中的 parse 方法,将 form 解析成对应的 Expr 子类。if 对应的就是 IfExpr.ParserIfExpr 了。

我们来画张 UML 图总结下这块结构:

每个 Expr 子类都有个内部类 Parser,它实现了 IParser 接口的 parse 方法,然后解析出对应的外部类这个 Expr 对象。

下面我们来分析一两个 Parser

Parser 举例: if

IfExpr.Parser 为例子,去除一些我们暂时不关心的内容之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Expr parse(C context, Object frm) {
    ISeq form = (ISeq) frm;
     //(if test then) or (if test then else)
    if(form.count() > 4)
          throw Util.runtimeException("Too many arguments to if");
    else if(form.count() < 3)
          throw Util.runtimeException("Too few arguments to if");

    Expr testexpr = analyze(context == C.EVAL ? context : C.EXPRESSION, RT.second(form));
    Expr thenexpr, elseexpr;

    try {
    thenexpr = analyze(context, RT.third(form));
    }finally{......}

    try {
    elseexpr = analyze(context, RT.fourth(form));
    } finally{......}
    return new IfExpr(lineDeref(),
                        columnDeref(),
                        testexpr,
                        thenexpr,
                        elseexpr);
}

这逻辑实在太直白了,所以我注释都没加:

  1. 检查下整个 form 的元素数目是不是在 3 个或者 4 个(为什么不是 2 或者 3?因为if 本身是 form 的第一个元素)。如果参数不对,就报错。
  2. 调用 analyze 递归分析 test 『语句』,也就是 form 的第二个元素。
  3. 调用 analyze 继续分析 then 和 else 『语句』。
  4. 最后,将分析后的结果生成一个 IfExpr 对象返回。

Parser 举例:fn

我们再来分析一个稍微复杂点的,比如 fn,fn 有两种格式:

1
2
3
4
;;定义函数,没有其他重载函数。
(fn name? [params* ] condition-map? exprs*)
;;定义函数,多个重载函数分支,每个都是 ([params* ] condition-map? exprs*) 的格式
(fn name? ([params* ] condition-map? exprs*)+)

前者也是先在 FnExpr.Parser 内转成后者的:

1
2
3
4
//now (fn [args] body...) or (fn ([args] body...) ([args2] body2...) ...)
//turn former into latter
if(RT.second(form) instanceof IPersistentVector)
    form = RT.list(FN, RT.next(form));

然后对 form 做迭代,每个重载分支称为一个 FnMethod,调用 FnMethod.Parser 来解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 for (ISeq s = RT.next(form); s != null; s = RT.next(s)) {
            FnMethod f = FnMethod.parse(fn, (ISeq) RT.first(s), fn.isStatic);
            if (f.isVariadic()) {
                if (variadicMethod == null)
                    variadicMethod = f;
                else
                    throw Util.runtimeException("Can't have more than 1 variadic overload");
            }
            else if (methodArray[f.reqParms.count()] == null)
                methodArray[f.reqParms.count()] = f;
            else
                throw Util.runtimeException("Can't have 2 overloads with same arity");
            if (f.prim != null)
                prims.add(f.prim);
        }

解析出来的 FnMethod 会根据它的参数个数加到数组 methodArray 里。这个数组的大小是 21 个,也就是说任何函数的重载分支不能超过 21 个,需要更多参数的,请使用可变参数 [& args]

其中 variadicMethod 会指向其中的可变参数分支的 FnMethodf.isVariadic 返回真),但是会检查这个分支的参数个数必须比其他分支的参数个数多:

1
2
3
4
for(int i = variadicMethod.reqParms.count() + 1; i <= MAX_POSITIONAL_ARITY; i++)
  if(methodArray[i] != null)
      throw Util.runtimeException(
              "Can't have fixed arity function with more params than variadic function");

举个例子:

1
2
user=> (fn ([a] 1) ([a b c] 3) ([a & args] :variadic))
CompilerException java.lang.RuntimeException: Can't have fixed arity function with more params than variadic function

FnMethod Parser

FnMethod 指代一个函数的重载分支,它的解析也很直白,先解析参数列表:

1
2
3
4
5
6
7
8
9
10
11
12
FnMethod method = new FnMethod(......);
......
PersistentVector argLocals = PersistentVector.EMPTY;
for(int i = 0; i < parms.count(); i++)
{
  if(!(parms.nth(i) instanceof Symbol))
      throw new IllegalArgumentException("fn params must be Symbols");
  Symbol p = (Symbol) parms.nth(i);
  ......
  argLocals = argLocals.cons(lb);
}
method.argLocals = argLocals;

参数一定是 symbol,所有参数收集到一个 PersistentVector 里。

如果有可变参数符号 &,那么就有 restParam:

1
2
3
4
5
6
7
8
static final Symbol _AMP_ = Symbol.intern("&");
......
if(p.equals(_AMP_)){
......
   state = PSTATE.REST;
......
   case REST:
      method.restParm = lb;

然后前文提到的 isVariadic 就是判断有没有 restParm:

1
2
3
4
   //是否是可变参数分支
  boolean isVariadic(){
      return restParm != null;
  }

然后解析函数 body:

1
2
method.body = (new BodyExpr.Parser()).parse(C.RETURN, body);
return method;

解析 Body 的 BodyExpr 又是一个递归调用 analyze 的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 public Expr parse(C context, Object frms) {
      ISeq forms = (ISeq) frms;
      //可能是 (do form1 form2 ...),取 next 部分。
      if(Util.equals(RT.first(forms), DO))
          forms = RT.next(forms);
      PersistentVector exprs = PersistentVector.EMPTY;
      //遍历 body form 列表,收集结果到 exprs vector
      for(; forms != null; forms = forms.next())
          {
          Expr e = (context != C.EVAL &&
                    (context == C.STATEMENT || forms.next() != null)) ?
                   analyze(C.STATEMENT, forms.first())
                                                                      :
                   analyze(context, forms.first());
          exprs = exprs.cons(e);
          }
      //如果 body 为空,返回 nil
      if(exprs.count() == 0)
          exprs = exprs.cons(NIL_EXPR);
      // 否则返回 BodyExpr
      return new BodyExpr(exprs);
  }

Primitive 参数性能优化

细心的朋友可能还注意上面 FnExpr 遍历重载分支的时候有个 prims 的链表,它会将 fn.prime 不为 null 的加进去。它是干什么的呢?

1
2
 if (f.prim != null)
   prims.add(f.prim);

其实这里是 Clojure 编译器为了 long/double 类型的参数避免类型转化和 box/unbox 引入的性能优化。prim 就是 primitive 的意思。当参数的 type hint 包含 double 或者 long 类型的并且参数个数小于4个的时候,生成的 Java 字节码方法将直接传入 long 或者 double 原生类型参数,而不是一般的 Object 类型,避免了转型和装箱拆箱。

FnMethod 相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
method.prim = primInterface(parms);

static public char classChar(Object x){
      Class c = null;
      if(x instanceof Class)
          c = (Class) x;
      else if(x instanceof Symbol)
          c = primClass((Symbol) x);
      if(c == null || !c.isPrimitive())
          return 'O';
      //如果是 long 类型,返回 L 字符
      if(c == long.class)
          return 'L';
      // 如果是 double 类型,返回 D 字符
      if(c == double.class)
          return 'D';
      throw new IllegalArgumentException("Only long and double primitives are supported");
}

static public String primInterface(IPersistentVector arglist) {
      StringBuilder sb = new StringBuilder();
      //拼接所有参数类型字符
      for(int i=0;i<arglist.count();i++)
          sb.append(classChar(tagOf(arglist.nth(i))));
      sb.append(classChar(tagOf(arglist)));
      String ret = sb.toString();
      //如果包含了 D 或者 L,并且参数个数小于 4 个,就认为是一个 prim 分支。
      boolean prim = ret.contains("L") || ret.contains("D");
      if(prim && arglist.count() > 4)
          throw new IllegalArgumentException("fns taking primitives support only 4 or fewer args");
      //特殊命名
      if(prim)
          return "clojure.lang.IFn$" + ret;
      return null;
  }

Clojure 编译器会为每个函数分支都生成一个 invoke 方法。 举例来说,对于方法 (fn [^long a ^String b] :nothing),如果没有这个优化,生成的 Java 方法大概是这样:

1
2
3
4
5
public Object invoke(Object a, Object b){
    long a  = (Long) a;
    String b = (String) b;
    return Keyword.intern("nothing");
}

对于参数 a 来说,为了转化成 long 原生类型需要经过转型和拆箱两个调用,这对于性能敏感的场景是一个损耗。有了这个优化, invoke 方法就可以是这样:

1
2
3
4
public Object invokePrim(long a, Object b){
    String b = (String) b;
    return Keyword.intern("nothing");
}

有兴趣地还可以阅读下这篇博客 warn-on-boxed。这个优化对于数值计算的性能有很大作用,从测试来看,有一倍的提升。

总结

clojure.lang.Compileranalyze 进一步将 LispReader 读出来的 form 解析成了 Expr,等待进一步的求值或者编译成 Java Class 文件。analyze 过程也是一个递归下降解释器的实现,整体实现并不复杂。

结合前文和本篇博客,我们可以给 Clojure 编译器初步画一张流程图,只考虑求值过程,暂不考虑 compile 函数:

接下来,我们即将进入求值和编译环节。

最后附赠一个 Compiler 所有 Expr 子类的 UML 图(单独打开放大):

微信 phxpaxos 源码解读:fsync 和 fdatasync

| Comments

最近在读微信开源的 paxos 实现 phxpaxos,读到 localstorage 部分学习到 fdatasync 系统调用。这一部分是非常核心的存储模块,参与者的状态信息、变更日志等都要写入磁盘并且可能要求强制刷入存储磁盘避免系统崩溃等情况下数据丢失,性能的很大一部分因素取决于这一块的实现。

phxpaxos 使用了 LevelDB 做存储,但是做了几个优化:

  • LevelDB 存储的 value 只是一个 24 个字节的 fileid 索引(有一个例外 minchosen 值),真正的状态数据存储在他们自己实现的 log_store 里, fileid 存储了在 log_store 里的文件编号、写入 vfile 的offset 和 checksum 这三个字段信息。这样做的原因是由于 leveldb 对于比较大的 value 的存取效率不是最优,通过间接一层存储,利用了 LevelDB 的 Key 的有序性和小 value 存储的性能优势,加上定制的 log_store 的写入优化,达到最优组合。
  • log_store 按照文件编号固定增长,比如 1.f2.f3.f ……以此类推(在日志目录的 vfile 目录下)。并且每个文件都是在创建的时候分配固定大小,默认 100M(还有所谓 large buffer 模式分配的是 500M)。值的写入都是 append 操作,写入完成后,将偏移量、校验和、当前文件 ID 再写入 LevelDB。读取的时候就先从 LevelDB 得到这三个信息,然后去对应的 vfile 读取实际的值。因为文件是固定大小分配,每次强制刷盘就不需要调用 fsync,而是采用 fdatasync,这样就无需做两次刷盘,因为 fdatasync 不会去刷入文件的元信息(比如大小、最后访问时间、最后修改时间等),而 fsync 是会的。

一张图来展示大概是这样:

注意到图中还有个 metafile,它存储了 log store 的当前的文件编号这个元信息,写满一个 vfile 就递增下这个计数,并更新到 metafile,为了保证不丢,对它的每次修改都需要强制 fsync。

回到文本的主题 fsync 和 fdatasync,按照 manual 的描述, fsync 同时刷入数据和元信息

1
2
3
4
fsync() transfers ("flushes") all modified in-core data of (i.e., modified
 buffer cache pages for) the file referred to by the file descriptor fd to 
 the disk device ...... It also flushes metadata information associated 
 with the file (see stat(2)).

而 fdatasync 如无必要,只刷入数据:

1
2
3
 fdatasync()  is  similar  to  fsync(),  but  does  not flush modified 
 metadata unless that metadata is needed in order to allow a subsequent 
 data retrieval to be correctly handled.

log_store 的实现因为提前分配了固定大小的文件,因此无需在刷盘的时候再去写入元信息,就可以使用 fdatasync 来优化刷盘性能,而访问时间和修改时间之类的元信息丢失对业务没有影响。

Clojure Under a Microscope(1): Clojure 如何理解代码(上)

| Comments

开篇

最近在读《Ruby Under a Microscope》(已经有中文版《Ruby 原理剖析》)。我很喜欢这本书介绍 Ruby 语言实现的方式,图文并茂,娓娓道来,不是特别深入,但是给你一个可以开始学习 Ruby 源码的框架和概念。

我大概在 2-3 年前开始阅读 Clojure Java 实现的源码,陆陆续续也有一些心得,想着可以在读 Ruby 这本书的时候,按照这本书的思路梳理一遍。因此就有了这第一篇: Clojure 如何理解代码。

目录:

IO Reader

我们抛开 leiningen 等构建工具,Clojure 唯一需要的是 JVM 和它的 jar 包,运行一段简单的 clojure 代码,可以这样:

1
2
$ java -cp clojure.jar  clojure.main -e "(println (+ 2 2))"
4

clojure.main 是所有 clojure 程序的启动入口,关于启动过程,后续会有单独一篇博客来介绍。-e 用来直接执行一段传入的 clojure 代码。

当 clojure 读到 (println (+ 2 2)) 这么一行代码的时候,它看到的是一个字符串。接下来它会将这段字符串拆成一个一个字符来读入,也就是

1
( p r i n t l n   ( +   2   2 ) )

这么一个字符列表。这是通过 java.io.PushBackReader 来完成。 Clojure 内部封装了一个 LineNumberingPushbackReader 的类继承了 PushbackReader ,并且内部封装了 Java 标准库的 LineNumberReader 来支持记录代码行列号(为了调试、报错、记录元信息等目的),并且最重要的是支持字符的回退(unread),它可以将读出来的字符『吐』回去,留待下次再读。内部其实就是一个回退字符缓冲区。

我们来试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(def r
  (-> "(println (+ 2 2))"
      (java.io.StringReader.)
      (clojure.lang.LineNumberingPushbackReader.)))

(.read r)            ; => 40  '('
(.read r)            ; => 112 'p'
(.read r)            ; => 114 'r'
(.unread r 114)      ; 『吐』回 'r' 
(.read r)            ; => 114 'r'   
(.read r)            ; => 105 'i' 
(.getLineNumber r)   ;  获取行号,从 1 开始
(.getColumnNumber r) ;  获取列号,从 0 开始
......

read 返回的的是字符串的整数编码(0 – 65535),Clojure 默认使用的是 UTF-8 编码。查看一个字符的整数编码可以 int 强制转换:

1
2
(int \()  ; => 40
(int \你) ; => 20320

上面的例子中我们 unread 了 114(也就是字符 ‘r’),然后下次调用 read,返回的仍然是 114。Clojure 的词法解析需要依赖这个回退功能。

此外还可以通过 getLineNumbergetColumnNumber 获取代码的行号和列号。这个行列信息最终会在 Clojure 对象的 metadata 里,比如我们看下 + 这个函数的行列信息:

1
2
user=> (select-keys (meta #'clojure.core/+) [:column :line :file])
{:column 1, :line 965, :file "clojure/core.clj"}

LispReader

单个字符是没有意义,接下来,Clojure 需要理解这些字符组成的字符串是个什么东西,理解了之后才能去执行求值。

这个『东西』,在 Clojure 里定义为 form。form 其实不是 clojure 特有的概念,而应该说是 lisp 系的语言都有一个概念。form 该怎么理解呢? 粗糙地理解,它是 Clojure 的对象,对应了一种 clojure 数据类型。更精确地说,form 是一个可以被正常求值的『程序单元』。

form 可以是:

  • Literals 字面量,比如字符、字符串、数字、nil、true/false 布尔值等等。
  • Symbol 符号,可以先简单地理解成类似 Java 的变量名称 identifier。
  • Lists 括号括起来的列表,如 (a b c)
  • Vectors 这是 clojure 有别于其他 lisp 方言的地方,中括号括起来的列表 [1 2 3]
  • Maps 散列表 {:a 1 :b 2}
  • Sets/Map namespace(1.9 新增)、deftype、defrecord 等其他类型。

那么 Clojure 是怎么将上面 reader 读到的字符流理解成 form 的呢?这是通过 LispReader 来完成,他负责将字符流解析成 form。我们尝试调用它的 read 方法来读取下 "(println (+ 2 2))",看看结果是什么:

1
2
3
4
5
 (def r
  (-> "(println (+ 2 2))"
      (java.io.StringReader.)
      (clojure.lang.LineNumberingPushbackReader.)))
 (def form (clojure.lang.LispReader/read r nil))

查看下 form:

1
2
3
4
user=> form
(println (+ 2 2))
user=> (class form)
clojure.lang.PersistentList

这个 form 的『样子』和它的文本字符串是一模一样的 (println (+ 2 2)),可是它不是字符串了,而是一个 List —— Clojure 的数据结构也是最重要的数据结构。这个一模一样就是所谓的同像性,也就是 Homoiconicity。因为 form 其实就是 AST,(println (+ 2 2)) 是一个层次的嵌套结构,转换成树形如下:

image

对应的刚好也是语法树,那么同像性就赋予我们操作这棵语法树的能力,因为它本质上就是一个普通的 Clojure 『对象』,也就是 form。我们可以随心所欲的操作这个 form,这也是 Clojure 强大的元编程能力的基础。

如果对应到编译原理, LispReader 不仅是 Lexer,同时也是 Parser。除了读取解析出词法单元之外,还会检查读取的结果是否是一个合法的可以被求值的 form,比如我们故意少一个括号:

1
2
user=> (read-string  "(+ 1 2")
RuntimeException EOF while reading  clojure.lang.Util.runtimeException (Util.java:221)

read-string 和另一个函数 read 最终调用的还是 LispReader,因为少了个括号,它会报错,这不是一个合法的 form。

Clojure 的编译器是 one-pass 还是 two-pass?

编译器可以多遍扫描源码,做分词、解析、优化等等工作。那么 Clojure 编译器是几遍?

严格来讲, Clojure 的编译器是 two-pass 的,但是很多情况下都是 one-pass 的。

但是 pass 这个概念在 clojure 里不是特别合适,按照 Rich Hickey 的答复,Clojure 的编译器更多是按照一个一个编译单元来描述更合适。每个单元是一个顶层(toplevel) form。

比如你有一个 clojure 代码文件:

1
2
3
(def a 1)
(def b 2)
(println (+ 1 2))

clojure 编译器会认为这里有三个顶层编译单元,分别是 (def a 1)(def b 2)(println (+ 1 2)),这三个编译单元都是最顶层的 form,它们会按照在文件中的出现顺序一一编译。

正因为编译单元要按照这个顺序,因此其实 clojure 不支持循环引用,或者前向查找(但是特别提供了 declare):

1
2
(def b 2)
(println (+ a b))

第二个 form 将报错,因为找不到 a:

1
 Unable to resolve symbol: a in this context

请注意,前向查找跟多少遍扫描没有关系,一遍扫描也可以实现前向查找。Clojure 这里的选择是基于两个理由:编译性能和名称冲突考虑。参见这个 YC 上的回复

LispReader 实现

LispReader 的实现是一个典型的递归下推机,往前读一个字符,根据这个字符的类型通过一系列 if 语句判断要执行哪一段解析,完整代码在 github,核心的循环代码精简如下,并加上注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
for(; ;){
           //读取到一个 List,返回。
          if(pendingForms instanceof List && !((List)pendingForms).isEmpty())
              return ((List)pendingForms).remove(0);

          //读一个字符
          int ch = read1(r);

          //跳过空白,注意,逗号也被认为是空白
          while(isWhitespace(ch))
              ch = read1(r);

          //读到末尾
          if(ch == -1)
              {
              if(eofIsError)
                  throw Util.runtimeException("EOF while reading");
              return eofValue;
              }

          //读到设定的返回字符,提前返回。
          if(returnOn != null && (returnOn.charValue() == ch)) {
              return returnOnValue;
          }

          //可能是数字
          if(Character.isDigit(ch))
              {
              Object n = readNumber(r, (char) ch);
              return n;
              }

          //根据字符,查找 reader 表,走入更具体的解析
          IFn macroFn = getMacro(ch);
          if(macroFn != null)
              {
              Object ret = macroFn.invoke(r, (char) ch, opts, pendingForms);
              //no op macros return the reader
              if(ret == r)
                  continue;
              return ret;
              }
          //如果是正负符号,进一步判断可能是数字
          if(ch == '+' || ch == '-')
              {
              //再读一个字符
              int ch2 = read1(r);
              //如果是数字
              if(Character.isDigit(ch2))
                  {
                  //先回退 ch2 ,继续调用 readNumber 读出数字。
                  unread(r, ch2);
                  Object n = readNumber(r, (char) ch);
                  return n;
                  }
              //不是数字,回退 ch2
              unread(r, ch2);
              }
          //读取 token,并解析
          String token = readToken(r, (char) ch);
          return interpretToken(token);
          }
}

LispReader 维护了一个字符到 reader 的映射,专门用于读取特定的 form,也就是上面 getMacro 用到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   static IFn[] macros = new IFn[256]; //特殊宏字符到 Reader 函数的映射
  macros['"'] = new StringReader();  // 双引号开头的使用字符串Reader
  macros[';'] = new CommentReader();  // 注释
  macros['\''] = new WrappingReader(QUOTE); // quote 
  macros['@'] = new WrappingReader(DEREF);// deref符号 @
  macros['^'] = new MetaReader();   //元数据
  macros['`'] = new SyntaxQuoteReader(); // syntax quote
  macros['~'] = new UnquoteReader();   // unquote
  macros['('] = new ListReader();      //list 
  macros[')'] = new UnmatchedDelimiterReader();  //括号不匹配
  macros['['] = new VectorReader();   //vector
  macros[']'] = new UnmatchedDelimiterReader();  // 中括号不匹配
  macros['{'] = new MapReader();     // map
  macros['}'] = new UnmatchedDelimiterReader();  // 大括号不匹配
  macros['\\'] = new CharacterReader();   //字符,如\a
  macros['%'] = new ArgReader();   // 匿名函数便捷记法里的参数,如%, %1
  macros['#'] = new DispatchReader();  // #下面将提到的 dispatch macro
  
  static private IFn getMacro(int ch){
    if(ch < macros.length)
        return macros[ch];
    return null;
   }

ListReader 实现解析

我们先看下 ListReader,它是一个普通的 Clojure 函数,继承 AFn,并实现了 invoke 调用方法,关于 Clojure 的对象或者说运行时模型,我们后文再谈,ListReader 核心的代码如下:

1
2
3
List list = readDelimitedList(')', r, true);
IObj s = (IObj) PersistentList.create(list);
return s;

调用了 readDelimitedList 获取了一个 List 列表,然后转换成 Clojure 的 PersistentList 返回。readDelimitedList 的处理也很容易理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//收集结果
ArrayList a = new ArrayList();

for(; ;)
  {
  int ch = read1(r);
  //忽略空白
  while(isWhitespace(ch))
      ch = read1(r);
  //非法终止
  if(ch == -1)
      {
      if(firstline < 0)
          throw Util.runtimeException("EOF while reading");
      else
          throw Util.runtimeException("EOF while reading, starting at line " + firstline);
      }
  //读到终止符号,也就是右括号),停止
  if(ch == delim)
      break;
  //可能是macro fn
  IFn macroFn = getMacro(ch);
  if(macroFn != null)
      {
      Object mret = macroFn.invoke(r, (char) ch);
      //no op macros return the reader
      
      //macro fn 如果是no op,返回reader本身
      if(mret != r)
          //非no op,加入结果集合
          a.add(mret);
      }
  else
      {
      //非macro,回退ch
      unread(r, ch);
      //读取object并加入结果集合
      Object o = read(r, true, null, isRecursive);
      //同样,根据约定,如果返回是r,表示null
      if(o != r)
          a.add(o);
      }
  }
//返回收集的结果集合

return a;

再举一个例子,MetaReader,用于读取 form 的元信息。

MetaReader 解析

Clojure 可以为每个 form 附加上元信息,例如:

1
2
user=> (meta (read-string "^:private (+ 2 2)"))
{:private true}

通过 ^:private,我们给 (+ 2 2) 这个 form 设置了元信息 private=true。当 LispReader 读到 ^ 字符的时候,它从 macros 表找到 MetaReader,然后使用它来继续读取元信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
   //meta对象,可能是map,可能是symbol,也可能是字符串,例如(defn t [^"[B" bs] (String. bs))
   Object meta = read(r, true, null, true);
  //symbol 或者 字符串,就是简单的type hint tag
  if(meta instanceof Symbol || meta instanceof String)
      meta = RT.map(RT.TAG_KEY, meta);
  //如果是keyword,证明是布尔值的开关变量,如 ^:dynamic ^:private
  else if (meta instanceof Keyword)
      meta = RT.map(meta, RT.T);
  //如果连 map 都不是,那很抱歉,非法的meta数据
  else if(!(meta instanceof IPersistentMap))
      throw new IllegalArgumentException("Metadata must be Symbol,Keyword,String or Map");

  //读取要附加元数据的目标对象
  Object o = read(r, true, null, true);
  if(o instanceof IMeta)
      //如果可以附加,那么继续走下去
      {
      if(line != -1 && o instanceof ISeq)
          {
          //如果是ISeq,加入行号,列号
          meta = ((IPersistentMap) meta).assoc(RT.LINE_KEY, line).assoc(RT.COLUMN_KEY, column);
          }
      if(o instanceof IReference)
          {
          //如果是 ref,重设 meta
          ((IReference)o).resetMeta((IPersistentMap) meta);
          return o;
          }
      //增加 meta 到原有的 ometa
      Object ometa = RT.meta(o);
      for(ISeq s = RT.seq(meta); s != null; s = s.next()) {
      IMapEntry kv = (IMapEntry) s.first();
      ometa = RT.assoc(ometa, kv.getKey(), kv.getValue());
      }
      //关联到o
      return ((IObj) o).withMeta((IPersistentMap) ometa);
      }
  else
      //不可附加元素,抱歉,直接抛出异常
      throw new IllegalArgumentException("Metadata can only be applied to IMetas");

从代码里可以看到,不是所有 form 都可以添加元信息的,只有实现 IMeta 接口的 IObj 才可以,否则将抛出异常:

1
2
user=> ^:private 3
IllegalArgumentException Metadata can only be applied to IMetas  clojure.lang.LispReader$MetaReader.invoke (LispReader.java:820)

Dispatch Macros

Clojure 同时还支持 # 字符开始的所谓 dispatch macros,比如正则表达式 #"abc" 或者忽略解析的 #_(form)。这部分的解析也是查表法:

1
2
3
4
5
6
7
8
9
dispatchMacros['^'] = new MetaReader();  //元数据,老的形式 #^
dispatchMacros['\''] = new VarReader();   //读取var,#'a,所谓var-quote
dispatchMacros['"'] = new RegexReader();  //正则,#"[a-b]"
dispatchMacros['('] = new FnReader();    //匿名函数快速记法 #(println 3)
dispatchMacros['{'] = new SetReader();   // #{1} 集合
dispatchMacros['='] = new EvalReader();  // eval reader,支持 var 和 list的eval
dispatchMacros['!'] = new CommentReader();  //注释宏, #!开头的行将被忽略
dispatchMacros['<'] = new UnreadableReader();   // #< 不可读
dispatchMacros['_'] = new DiscardReader();   //#_ 丢弃

LispReader 读到 # 字符的时候,会从 macros 表找到 DispatchReader,然后在 DispatchReader 内部继续读取一个字符,去 dispatchMacros 找到相应的 reader 进行下一步解析。

更多 Reader 源码解析,可以参考我的注解,或者自行研读。

本篇总结

一张图来总结本篇所介绍的内容:

reader

Clojure 在从文件或者其他地方读取到代码文本后,交给 IO Reader 拆分成字符,然后 LispReader 将字符流解析成可以被求值的 form。

我们前面提到 LispReader 同时是 Lexer 和 Parser,但是它并不是完整意义上的 Parser,比如它不会去检查 if 的使用是否合法:

1
2
3
4
user=> (read-string "(read-string "(if 1 2 3 4)")")
(if 1 2 3 4)
user=> (if 1 2 3 4)
CompilerException java.lang.RuntimeException: Too many arguments to if, compiling:(NO_SOURCE_PATH:93:1)

LispReader 只会检查它是否是一个合法的 form,而不会去检查它的语义是否正确,更进一步的检查需要 clojure.lang.Compiler 介入了,它会执行一个 analyze 解析过程来检查,这是下一篇要讲的内容。

Clojure 并发实践:使用 pmap 加速程序

| Comments

LeanCloud 的控制台会展示一个应用列表,应用列表会展示该用户的所有应用,以及每个应用的基本信息,例如总用户数、昨天请求量和本月请求量等。我们最多允许每个用户创建 50 个应用。伪代码大概是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(defn add-app-info
  "添加应用统计信息。"
  [app]
  (assoc app
         :yesterday_reqs (count-reqs app 7)
         :monthly_reqs (count-reqs app 30)
         :total_users (count-users app)))

(defn get-client-apps
  "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (map add-app-info)))

显然,这里每个应用为了获取这些请求信息,都至少要请求三次。虽然这些统计请求本身已经有了缓存,但是假设有 50 个应用(实际中,部分开发者的应用数量包括协作应用在内会更多),那就需要发起 150 个请求,这个过程如果完全串行处理的话,假设 add-app-info 的开销至少是 1~3 毫秒,串行处理下来也需要 50~150 毫秒,加上传输的时间,那么用户的体验的就相当差了。

这时候,我们可以用并发处理来加速了,你只需要替换一个函数,将 get-client-appsmap 替换为 pmap 即可:

1
2
3
4
5
6
(defn get-client-apps
   "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (pmap add-app-info)))

关于 pmap 的讨论参见 并发函数pmap、pvalues和pcalls。因为 pmap 对于 chunked sequnce 的处理是批量处理,因此最多同时使用 32 个并发任务在处理,这个线程数量在这个场景下是可以接受的。加速后的性能也可以估算出来 (Math/round (/ n 32.0)) x (1~3) 毫秒。 在实际线上环境中,大概加速了 3~4 倍左右。

在测量性能的时候,注意使用 doall,因为 clojure 的 LazySeq 特性会干扰你的测试。

一个函数替换就能获得并发加速,抽象的力量在这里体现的淋漓尽致。

Xmemcached 死锁分析和 Aviator 可变参数方法实现

| Comments

首先是 xmemcached 发了 2.2.0 版本,最重要解决的问题就是请求超时。详细的情况可以参考这个 issue 。推荐所有还在用 xmc 的朋友升级到这个版本,性能和稳定性都有所改进。

这个 bug 的原因可能更值得说道说道。

xmemcached 本身会对发出去的请求维护一个队列,在 onMessageSent 也就是消息写到 socket 后将请求放入队列,然后在收到 memcached 返回应答的时候,找出当前的请求来 decode 应答内容。伪代码是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
//Handler 里加入队列。
public void onMessageSent(Command msg, Session session){
    session.getQueue().offer(msg);
}

//Decoder 里做解码
public Command decode(ByteBuffer buf, Session session){
    Command cmd = session.getQueue().take().decode(buf);
    if(cmd!=null)
        return cmd;
    else
        return null;
}

这个 Bug 的关键就在于加入队列的时候和 take 的使用。 take 会阻塞当前操作,直到队列中有可用的元素或者被中断。而我们放入队列的时候是在命令被完全写入 socket 之后(有没有发出去,你无法确认的,因为有 socket 缓冲区、网卡缓冲区的存在)。其次是这两段逻辑是发生在同一个处理线程上。

那么当用户写入一个超过 1M 的数据的时候,假设是 2M。因为 memcached 最多只允许保存 1M 大小的数据,当 xmemcached 将超过 1M 但是还没有达到 2M 的数据发送到 memcached 后, memcached 立即应答返回错误。但是此时,数据还没有完全写出去,导致命令没有被加入队列,同时 take 也取不到数据,我们遇到了死锁: take 在等待命令加入,而写入命令数据的线程被 take 阻塞了没有机会继续写

找到问题解决就很简单了: 将放入队列的时机从完全写入后,放到开始写第一个字节之前的时候;或者将 take 改成 poll,不阻塞当前线程,当没有可用命令的时候会去重试。我选择了前者的方案。因为 memcached 的典型场景是读多写少,更希望尽快地 decode 出结果来响应。

Aviator 的自定义函数是继承 AbstractFunction,然后复写特定参数 arity 的方法实现即可。但是后来有用户发现无法处理超过 20 个参数的可变参数。回顾下代码,确实是没有处理这个逻辑。当超过 20 个参数的时候,应该将多余的参数封装成数组来调用。

Java 里的可变参数就是数组,一个简单例子:

1
2
3
4
5
6
7
8
public class Test{
    public static void method(int a, int ...b){
    }

    public static void main(String []args){
        method(1, 2, 3, 4);
    }
}

编译后 javap -v Test 看看字节码,关键是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     0: iconst_1
     1: iconst_3
     2: newarray       #int,创建数组
     4: dup
     5: iconst_0
     6: iconst_2
     7: iastore        #将 2 放入数组
     8: dup
     9: iconst_1
    10: iconst_3
    11: iastore        #将 3 放入数组
    12: dup
    13: iconst_2
    14: iconst_4
    15: iastore        #将4放入数组
    16: invokestatic  #2                  // Method method:(I[I)V

最终调用的方法签名是 (I[I)V,也就是 void (int a, int [] b)

Aviator 的 Parser 是 one pass 的,解析的同时生成字节码,就是一个典型的递归下降解释器。当遇到方法调用,因为是一遍扫描,在完全解析完整个方法调用之前,我是不知道有多少个参数的,因此就不知道应该创建多大的额外参数数组。

遇到这种不知道多少元素将加入的集合的问题,那肯定不能用数组了,直接用 List 吧。基本的实现逻辑也很简单,当解析到第 20 个参数,就创建一个 List 实例,后面再解析到的参数就加入这个 List。到解析方法调用完成后,将前面的 20 个参数,加上 list 里面的元素组成的数组,一起做个调用,伪代码类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void onMessageParam(Token token){
    if(getEnv().getParamsNumber()) > 20){
        List list = getEnv().getOrCreateParamList();
        list.add(token);
    }
    onTernary(token);
}

public void onMessageInvoke(Token token){
    List list = getEnv().getOrCreateParamList();
    AviatorObject [] extraParams = new AviatorObject[list.size()];
    extraParams = list.toArray(extraParams);

    ......
    function.call(param1, param2, ......., extraParams);
}

当然实际的代码是用 ASM 直接写成字节码的。