庄周梦蝶

生活、程序、未来

自旋锁的优化

| Comments

自旋锁的提出主要也是为了解决多核系统的同步问题。当锁要保护的代码块执行很快的时候,并且争抢不是非常激烈的时候,自旋锁的比之重量级的需要切换上下文的互斥锁能带来更好的性能表现。对于非多核系统,用户态的自旋锁空耗 CPU,反而降低了整个系统的 progress,作用反而不大。

此外,当保护的代码块执行较为耗时,或者自旋锁的争抢非常激烈的时候,自旋锁本身就消耗了大量无谓的 CPU ,这种情况下还不如使用互斥锁,让出 CPU 给任务执行,提高实际的 CPU 利用率。

我在前面一篇介绍 CPU 高速缓存的博客,举了个例子用 AtomicInteger 的 compareAndSet 实现一个自旋锁,主要是为了演示在 SMP 下通过增加一个内循环,来减少锁状态 state 在多个 CPU 之间的『颠簸』。

但是其实这个例子如果用 synchronized 或者 ReentrantLock 改写,都会快得多。比如换成 ReentrantLock,

1
2
3
4
5
6
7
8
9
10
private ReentrantLock lock =  new ReentrantLock();

/**
 * 递增 count,使用 ReentrantLock 保护 count
 */
public void incr() {
    lock.lock();
    count = count + 1;
    lock.unlock();
}

运行时间下降到了 150~230 毫秒,比之原来测试的两个版本快了一个数量级。

原因就在于递增 +1 这个操作非常快,导致自旋锁版本的代码线程争抢锁非常激烈,大家抢的头破血流,但是任务却没有多大进展,空耗了大量 CPU。就好像一堆人抢着去上卫生间,大家都不排队,你挤我抢,反而堵在了门口,卫生间的实际利用率下降了。从这个生活中的例子出发,我们可以想到一些优化的方法,比如分散每个人的争抢时间,比如排队。

因此,自旋锁的优化有一些思路:

首先是退让(Back off),每个线程在争抢锁之前等待一小会,通常第一次不退让,但是之后就按照某个规则变化这个退让时间,我们修改下原来的 BusyLock 例子来演示这个优化,这里采用指数退让:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 //最大退让时间 10 毫秒
 private static final int max_backoff_msg=10;

     /**
     * 利用 CAS 实现自旋锁
     */
    private void lock() {
        //其实退让时间 1 毫秒。
        int backoff=1;
        while (!state.compareAndSet(0, 1)){
            while (state.get() == 1)
                ;
            //不是第一次,退让一下
            if(backoff>1){
                try{
                    Thread.sleep(backoff);
                }catch(Exception InterruptedException){
                    Thread.currentThread().interrupt();
                }
            }
            //还没有超过最大退让时间,指数增加退让时间。
            if(backoff<max_backoff_msg){
                backoff*=2;
            }
        }
    }

通过引入一个退让机制,我们重新跑下原来的测试,整体执行时间下降到了 120~200 毫秒了,这就跟 ReentrantLock 差不了太多了。退让时间可以是静态不变的,或者动态变化的,动态变化可以是这里的指数递增,或者随机生成。不同的策略也会带来不同的影响。

但是呢,这个实现是否比 ReentrantLock 的实现更好? 肯定不是。首先没有处理可重入锁,也不支持 tryLock 超时、取消等;其次,用的 sleep 做退让,最小单位是毫秒,并且 sleep 精度无法保证。另外,没有任何公平性可言,等待最久的线程可能退让最多,新来的很可能先抢到锁等等。

这就引出了另一个优化思路:排队(Queuing Lock)。这其实就是 ReentrantLock 的实现方式。ReentrantLock 有两种模式:公平和非公平,通常情况都推荐你使用非公平版本,也就是默认的。对于非公平版本的 lock 实现如下:

1
2
3
4
5
6
7
8
9
    * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

先尝试用 CAS 获取锁,如果成功了,那就设置独占线程为自己,如果失败,进入 AQS 框架的 aquire 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

aquire 里同样先尝试下能不能获取,不行就去调用 addWaiteracquireQueued 去尝试排队,如果还是不行,就中断取消。

排队过程也是一个 lock-free 的循环过程。acquireQueued 仍然会去自旋尝试获取自旋锁,如果仍然继续失败,就调用 park 让出 CPU 不再参与调度,等锁被释放的时候被前驱节点的线程(释放锁的线程)唤醒再次参与争抢。最后,通过引入队列禁止抢占,也可以支持严格的公平锁了。

关于 AQS 和 ReentrantLock 源码解析,网上资料多如牛毛,这里也不重复了。

排队模式的自旋锁目前有几种:基于票据(ticket),每个线程分配一个票据,轮到自己就获得锁;MCS 锁CLH Lock,都是基于链表, AQS 框架从一定程度上是 CLH 的变种实现。

关于更多自旋锁的优化请参考 Spin Lock Performance,更仔细的设计需要将硬件的多核模型考虑进入。

题外话:单纯看一个东西源码,只能了解一个实现是什么样的,但是不知道为什么这样实现,有时候需要换个角度去思考,并且阅读一些相关方面的资料,才能理解作者为何如此设计等等。

分布式一致性论文阅读阶段性小结

| Comments

这个月阅读集中在分布式一致性和存储方面。

首先是 Paxos 系列论文:

  • 《Paxos Made Simple》,循循渐进地讲解 paxos 解决的问题、逐步增强的约束条件(P1、P2、P2a – P2c)等,P1 保证至少有一个值被接受, P2 保证只有一个被选中的值被所有 process 接受。然后介绍两阶段的步骤:

    • Phase 1.
      • (a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors. * (b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro- posal (if any) that it has accepted.
    • Phase 2.
      • (a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v, where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals. * (b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

加粗的两句话就是 paxos 的核心,一个是 acceptor 的承诺,一个是 proposer 的选择。达成一致的值之后,接下来论文讨论了其他 process 学习机制和 liveness 问题,paxos 无法解决活锁问题,但是实际应用中这个问题可以通过选举一个唯一的 proposer 来避免。接下来谈到实现,说明一个 Replication State Machine 的实现问题,着重讨论了 leader failure 情况下的『空洞』问题,通过引入 no-op commands 来减少服务中断时间。但是这篇论文没有明确提到 multi paxos 的介绍,这就需要继续阅读其他论文和博客来理解。

  • 《Paxos Made live》。这篇论文是 Google 发表的,讨论了 paxos 的工程实践,也就是 chubby 这个众所周知的分布式服务的实现,可以结合《The Chubby lock service for loosely-coupled distributed systems》 一起看。实际应用中的难点,比如 master 租约实现、group membership 变化、Snapshot 加快复制和恢复以及实际应用中遇到的故障、测试等问题,特别是最后的测试部分。非常值得一读。《The Chubby lock service for loosely-coupled distributed systems》 更多介绍了 Chubby 服务本身的设计决策,为什么是分布式锁服务,为什么是粗粒度的锁,为什么是目录文件模式,事件通知、多机房部署以及应用碰到的使用问题等等。

  • Paxos Made Practical。这篇论文更详细地讨论了 State Machine 的实现,甚至还带上了 C语言的伪代码,定义了 prosper 、 acceptor 以及 SM 本身需要实现的接口和通讯协议,更重要的是讨论了 membership change 的问题,通过引入 view 视图的概念,介绍了 view-change 协议来解决成员变动问题(可能是故障或者上下线新成员),按我的理解,这个过程也是 paxos 的应用。最后介绍了可能的优化手段。

view change 协议 (view change 协议)

  • Paxos 我还着重推荐阅读微信后端团队写的系列博客,包括他们开源的 phxpaxos 实现,基本上将所有问题都讨论到了,并且通俗易懂。

其次,一致性方面另一块就是 Raft 算法,按照 Google Chubby 论文里的说法,

1
2
Indeed, all working protocols for asynchronous consensus we have so far 
encountered have Paxos at their core.

但是 Raft 真的好理解多了,我读的是《In Search of an Understandable Consensus Algorithm》,论文写到这么详细的步骤,你不想理解都难。毕竟 Raft 号称就是一个 Understandable Consensus Algorithm。无论从任何角度,都推荐阅读这一篇论文。

首先能理解 paxos 的一些难点,其次是了解 Raft 的实现,加深对 Etcd 等系统的理解。这篇论文还有一个 250 多页的加强版《CONSENSUS: BRIDGING THEORY AND PRACTICE》,教你一行一行写出一个 Raft 实现,我还没有学习,有兴趣可以自行了解。Raft 通过明确引入 leader(其实 multi paxos 引申出来也有,但是没有这么明确的表述)来负责 client 交互和日志复制,将整个算法过程非常清晰地表达出来。Raft 的算法正确性的核心在于保证 Leader Completeness ,选举算法选出来的 leader 一定是包含了所有 committed entries 的,这是因为所有 committed entries 一定会在多数派至少一个成员里存在,所以设计的选举算法一定能选出来这么一个成员作为 leader。多数派 accept 应该说是一致性算法正确性的最重要的保证。

最后,我还读了《Building Consistent Transactions with Inconsistent Replication》,包括作者的演讲,作者也开放了源码。Google Spanner 基本是将 paxos 算法应用到了极致,但是毕竟不是所有公司都是这么财大气粗搞得起 TrueTime API,架得起全球机房,控制或者承受得了事务延时。这篇论文提出了另一个思路,论文介绍的算法分为两个层次: IR 和基于其他上的 TAPIR。前者就是 Inconsistent Replication,它将操作分为两类:

  • inconsistent: 可以任意顺序执行,成功执行的操作将持久化,支持 failover。
  • consensus:同样可以任意顺序执行并持久化 failover,但是会返回一个唯一的一致(consensus)结果。

IR 的调用图:

可见他需要服务端和客户端共同参与,对于 consensus 操作,如果 replicas 之间有冲突,会在客户端引入一个 decide 过程来决定使用哪一个值,相应地,在服务端为了解决 master 和 replicas 的不一致问题,引入了 sync/merge 过程来解决冲突,master 运行 merge 过程来解决 consensus 操作的副本冲突,而所有节点运行 sync 过程来同步 master 记录。关于 sync/master 的描述看原文:

1
2
3
4
5
6
7
8
9
10
11
12
"Some replicas may miss operations or need to reconcile their state if the 
consensus result chosen by the application (i.e., transaction) protocol does 
not match their result. To ensure that IR replicas eventually converge, they 
periodically synchronize. Similar to eventual consistency, IR relies on the 
application (i.e., transaction) protocol to reconcile inconsistent replicas.
 On synchronization, a single IR node first upcalls into the application 
 protocol with Merge, which takes records from inconsistent replicas and 
 merges them into a master record of successful operations and consensus 
 results. Then, IR upcalls into the application (i.e., transaction) protocol 
 with Sync at each replica. Sync takes the master record and reconciles 
 application (i.e., transaction) protocol state to make the replica consistent 
 with the chosen consensus results."

为了保证正确性,IR 对上层应用层协议有特殊的要求:

  • Invariant checks must be performed pairwise.也就是要求任意两个 consensus 操作,其中一个至少对另一个是可见的。不然无法检测是否冲突。
  • Application protocols must be able to change consensus operation results.对于已经达成一致的结果,还要允许是可以被修改的,merge 过程会修改原来认为的一致的结果,这是不一致复制必然带来的问题。
  • 性能原则 1:Application protocols should not expect operations to execute in the same order. 对于顺序不要有任何假设。
  • 性能原则 2:Application protocols should use cheaper inconsistent operations whenever possible rather than consensus operations. 尽量用 inconsistent 操作。比如在 TAPIR 里只有 prepare 是 consensus 类型操作。

正因为对于应用层协议有这么多的限制,因此论文提出了 TAPIR 这个算法来解决事务的 linearizable ordering 问题。TAPIR 的具体算法请阅读论文吧,这里不再复述。大体的思路就是客户端参与事务的冲突检测(OCC validation checks),Leader 执行IR 的 merge 过程,对于还没有committed 的事务(可能 abort ,也可能来不及提交),重新跑一遍 OCC 检测冲突,根据结果来决定最终是提交还是回滚。

对于复制和恢复的描述:

1
2
3
4
5
TAPIR’s sync function runs at the other replicas to reconcile TAPIR state with
 the master records, correcting missed operations or consensus results where 
 the replica did not agree with the group. It simply applies operations and
  consensus results to the replica’s state: it logs aborts and commits, and 
  prepares uncommitted transactions where the group responded PREPARE-OK.

传统两阶段提交, Google spanner 之类的思路:

TAPIR 的流程:

关于 TAPIR 的解读推荐两篇博客:Building Consistent Transactions with Inconsistent ReplicationPaper review: Building Consistent Transactions with Inconsistent Replication (SOSP’15)。 TAPIR 的源码只包含了 normal case 的处理,恢复之类的过程都是没有的,对于 recovery 的一些疑问,可以参考 A FEW WORDS ABOUT INCONSISTENT REPLICATION (IR),同样也是我的疑问,这在实际工程中是非常重要的部分,但是论文却是匆匆带过。

最近还回顾了不少分布式存储的论文, GFS、BigTable、Dynamo 等,改天再做个总结吧。

Join Idle Queue 负载均衡算法解析

| Comments

JIQ 是微软发的一篇论文《Join-Idle-Queue: A Novel Load Balancing Algorithm for Dynamically Scalable Web Services》里描述的负载均衡算法,这里总结下我所理解的内容。

背景

负载均衡很常见,比如我们经常用 nginx 做反向代理和负载均衡,Nginx 也支持了 weight、ip_hash、url_hash 等均衡算法。

负载均衡的图示:

任务 jobs 不停地经由多个 dispatcher 转发给后面的 processor server 处理。

dispatcher 选择哪一条 processor 来转发任务的过程就是 load balance 的核心问题。尽量降低任务的响应时间是我们的目标。

最简单的算法可能是随机或者轮询,但是这种简单的策略会造成响应时间的最大化,特别是高负载的情况下。

更优化的策略有:

  • JSQ: Join-the-Shortest-Queue,每次将任务加入最少任务队列的 server。这就要求 dispatcher 收集每个 processor server 的任务队列大小信息,但是随着 dispatcher 本身的集群化以及云服务厂商的大规模应用,这个收集产生的网络通讯就更加膨胀了。
  • SQ(d)(Power-of-d):每当任务到达的时候, dispatcher 就随机取样 d 个 processor 服务的队列大小,选择最小任务队列的那个派发。通常 d 选择为 2。这个算法相比随机算法能带来响应时间指数级别的提升。但是仍然需要在分发任务的时候获取 processor 队列信息,这个同步调用在任务派发的关键路径上,对性能有很大影响。
  • 工作窃取和共享:工作窃取就是空闲队列主动去从其他任务队列『窃取』任务,或者繁忙的队列主动将任务『推送』给其他空闲队列。这个算法更适合共享内存的系统,对于 web 负载均衡,在不同后端 server 之间做任务的窃取或者推送会带来额外的开销和复杂度,想象一个 web http 请求如何转交到另一台后端 server,涉及 TCP 链接的迁移和请求的同步等等。

JIQ 全称就是 Join-Idle-Queue,它的提出就是为了解决大规模 Web Services 的负载均衡问题。

算法描述

JIQ 顾名思义就是将任务加入空闲的队列。空闲队列的数据结构称之为 I-Queue(idle queue)。JIQ 将算法描述为两部分:primary load balancing 和 secondary load balancing,两者之间就通过 I-Queue 数据结构通讯。

  • 首要负载均衡(primary load balancing):每当一个任务到达, Dispatcher 就检查 I-Queue 是否为空。如果不为空,就移除第一个空闲的 processor,将任务交给它。如果为空,就采用随机选择的一个 processor 派发任务。
  • 次级负载均衡(secondary load balancing):是指空闲的 processor 选择加入哪一个 I-Queue 的过程。这里讨论了两种策略,其实是前面讨论的负载均衡策略的翻版:
    • JIQ-Random,也就是随机策略,Idle Processor 随机选择一个 I-Queue 来加入。
    • JIQ-SQ(d),Idle Processor 随机取样 d 个 I-Queue,选择最小长度的来加入。

一张图来描述就是这样:

JIQ 的好处很明显:

  • 在派发任务的关键路径上移除了 Dispatcher 和 Processor 同步通讯的开销,通过 I-Queue 这个队列结构异步解耦了。
  • 同时可以证明 JIQ 带来的复杂度没有超过 SQ(d) 算法(有数学证明),并且在响应时间的降低上极大地超过了 SQ(d) 算法。

论文里给的一张测试结果,在7 种不同的响应时间分布模型上做对比,比较 SQ(d) 和 JIQ 的响应时间(越小越好)消耗:

PS 和 FIFO 是指任务的执行策略,PS 是 Process-Sharing 策略,来一个跑一个,而 FIFO 就是先入先出策略。R 和 S 开头分别表示采用的是 JIQ-Random 还是 JIQ-SQ(d) 的次级负载均衡策略,后面的数字表示 Processor 和 I-Queue 的比例, 比如 R40 就是采用 JIQ-Random 次级负载均衡策略,同时 processor 数量是 I-Queue 数量的 40 倍。

从结果来看,JIQ 比 SQ(d) 算法在降低响应时间上的结果非常好。其次是 JIQ-SQ(d) 比 JIQ-Random 的总体表现也更好。

论文里有相当多的数学计算来分析整个模型,这一块基本没看懂,有兴趣的地自行研读。

算法扩展

当在非常高负载(At very high load)的情况下,processor 可以在『轻』负载的时候就报告给 I-Queue 说自己现在是空闲状态。比如,processor 当自己的任务队列只有一个任务的时候,将自己加入 I-Queue,在完成任务后再加入一次。这样就可以在 I-Queue 里增加了一份 processor 的拷贝,变相地增加了任务到 Idle Processor 的到达率。

当报告的阈值设置为 2,性能的对比如下, JIQ 仍然表现很好:

Clojure Under a Microscope(1):Clojure 如何理解代码(下)

| Comments

继续上篇,本篇的目的是将 parse 过程介绍完成,然后我们就可以进入编译和运行环节。

目录:

LispReader 补充

上篇在介绍 LispReader 源码核心片段的时候没有介绍最后一个比较关键的代码片段:

1
2
      String token = readToken(r, (char) ch);
      return interpretToken(token);

interpretToken 方法将去解析字符串 token 的含义,token 就是一个词汇单元,它的含义是什么,完全由 interpretToken 决定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static private Object interpretToken(String s) {
  if(s.equals("nil"))
      {
      return null;
      }
  else if(s.equals("true"))
      {
      return RT.T;
      }
  else if(s.equals("false"))
      {
      return RT.F;
      }
  Object ret = null;

  ret = matchSymbol(s);
  if(ret != null)
      return ret;

  throw Util.runtimeException("Invalid token: " + s);
}

代码其实很清楚,token 可能是:

  • nil
  • true
  • false
  • symbol

symbol 再次说明下,你可以暂时理解为 Java 的变量 identifier,指代某个值。

P.S. LispReader 用了相当多的相当复杂的正则表达式来匹配整数、symbol、分数等,这一块也许有性能优化的空间。

Analyze

我们在上一篇提到, LispReader 它只负责解析出一个可以被求值的合法的 form,进行一些初步的语法规则检查,至于这个 form 更进一步是否符合语义,它并不关心。

比如前文举例了 (read-string "(if true 1 2 3)"),它可以被 LispReader 读出来,但是其实是不符合 if 这个 special form 的要求的,因为它只接受两个或者三个参数。

更进一步的检查是放在 clojure.lang.Compiler 中的,这个类也是 Clojure 代码编译成 JVM 所理解的字节码并交由 JVM 运行的核心。

具体地说,是 Compiler 类中的 analyze 方法,它会分析 LispReader 读出来的 form,并转化可以求值的 Expr。如图:

Expr 是一个 Java 接口,它有各种各样的子类,接口本身寥寥几个方法:

1
2
3
4
5
6
7
8
9
interface Expr{
  Object eval() ;

  void emit(C context, ObjExpr objx, GeneratorAdapter gen);

  boolean hasJavaClass() ;

  Class getJavaClass() ;
}

其中:

  • eval 用于对自身求值,得出表达式的结果。
  • emit用于编译,这个编译特指 Clojure 的编译,它会生成 Java 的 class 文件。
  • hasJavaClassgetJavaClass 是为了编译环节做优化引入的,暂时不聊。

关于 evalemit 我们放到下一篇文章的编译再讲。这里还是聚焦在 Compiler 如何 analyze 出 Expr。

analyze 的核心仍然是一个 50 多行的 if else 派发,它会检查传入的 form 的类型,然后根据类型进入更具体的 analyze 方法,比如 (if true 1 2 3) 这是一个 Sequence,那么会进入 analyzeSeq 方法:

1
2
3
4
......
else if(form instanceof ISeq)
              return analyzeSeq(context, (ISeq) form, name);
......             

analyzeSeq 中,它会先判断 form 的第一个元素(first)是否是 fn ,也就是一个函数,如果不是,那么会去检查下是否是 specail form,这里也是一个查表法:

1
2
3
4
5
6
7
8
9
   Object op = RT.first(form);

   IParser p;
  if(op.equals(FN))
      return FnExpr.parse(context, form, name);
  else if((p = (IParser) specials.valAt(op)) != null)
      return p.parse(context, form);
  else
      return InvokeExpr.parse(context, form);

最后,如果不是 fn ,也不是 special forms,那么可能是一个调用(比如函数调用,宏调用,Java method 调用等),那就进入 InvokeExprparse 过程。

special forms 都定义在 specials map 里了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static final public IPersistentMap specials = PersistentHashMap.create(
      DEF, new DefExpr.Parser(),
      LOOP, new LetExpr.Parser(),
      RECUR, new RecurExpr.Parser(),
      IF, new IfExpr.Parser(),
      CASE, new CaseExpr.Parser(),
      LET, new LetExpr.Parser(),
      LETFN, new LetFnExpr.Parser(),
      DO, new BodyExpr.Parser(),
      FN, null,
      QUOTE, new ConstantExpr.Parser(),
      THE_VAR, new TheVarExpr.Parser(),
      IMPORT, new ImportExpr.Parser(),
      DOT, new HostExpr.Parser(),
      ASSIGN, new AssignExpr.Parser(),
      DEFTYPE, new NewInstanceExpr.DeftypeParser(),
      REIFY, new NewInstanceExpr.ReifyParser(),
//       TRY_FINALLY, new TryFinallyExpr.Parser(),
TRY, new TryExpr.Parser(),
THROW, new ThrowExpr.Parser(),
MONITOR_ENTER, new MonitorEnterExpr.Parser(),
MONITOR_EXIT, new MonitorExitExpr.Parser(),
//       INSTANCE, new InstanceExpr.Parser(),
//       IDENTICAL, new IdenticalExpr.Parser(),
//THISFN, null,
CATCH, null,
FINALLY, null,
//       CLASS, new ClassExpr.Parser(),
NEW, new NewExpr.Parser(),
//       UNQUOTE, null,
//       UNQUOTE_SPLICING, null,
//       SYNTAX_QUOTE, null,
_AMP_, null
);

可以看到核心的 special forms 其实是非常少的,所以我一直不理解被人说 clojure 语法复杂。其实它的核心语法就这么 20 几个 special forms。

每个 special form 都对应一个 IParser 的子类:

1
2
3
interface IParser{
  Expr parse(C context, Object form) ;
}

实现其中的 parse 方法,将 form 解析成对应的 Expr 子类。if 对应的就是 IfExpr.ParserIfExpr 了。

我们来画张 UML 图总结下这块结构:

每个 Expr 子类都有个内部类 Parser,它实现了 IParser 接口的 parse 方法,然后解析出对应的外部类这个 Expr 对象。

下面我们来分析一两个 Parser

Parser 举例: if

IfExpr.Parser 为例子,去除一些我们暂时不关心的内容之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Expr parse(C context, Object frm) {
    ISeq form = (ISeq) frm;
     //(if test then) or (if test then else)
    if(form.count() > 4)
          throw Util.runtimeException("Too many arguments to if");
    else if(form.count() < 3)
          throw Util.runtimeException("Too few arguments to if");

    Expr testexpr = analyze(context == C.EVAL ? context : C.EXPRESSION, RT.second(form));
    Expr thenexpr, elseexpr;

    try {
    thenexpr = analyze(context, RT.third(form));
    }finally{......}

    try {
    elseexpr = analyze(context, RT.fourth(form));
    } finally{......}
    return new IfExpr(lineDeref(),
                        columnDeref(),
                        testexpr,
                        thenexpr,
                        elseexpr);
}

这逻辑实在太直白了,所以我注释都没加:

  1. 检查下整个 form 的元素数目是不是在 3 个或者 4 个(为什么不是 2 或者 3?因为if 本身是 form 的第一个元素)。如果参数不对,就报错。
  2. 调用 analyze 递归分析 test 『语句』,也就是 form 的第二个元素。
  3. 调用 analyze 继续分析 then 和 else 『语句』。
  4. 最后,将分析后的结果生成一个 IfExpr 对象返回。

Parser 举例:fn

我们再来分析一个稍微复杂点的,比如 fn,fn 有两种格式:

1
2
3
4
;;定义函数,没有其他重载函数。
(fn name? [params* ] condition-map? exprs*)
;;定义函数,多个重载函数分支,每个都是 ([params* ] condition-map? exprs*) 的格式
(fn name? ([params* ] condition-map? exprs*)+)

前者也是先在 FnExpr.Parser 内转成后者的:

1
2
3
4
//now (fn [args] body...) or (fn ([args] body...) ([args2] body2...) ...)
//turn former into latter
if(RT.second(form) instanceof IPersistentVector)
    form = RT.list(FN, RT.next(form));

然后对 form 做迭代,每个重载分支称为一个 FnMethod,调用 FnMethod.Parser 来解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 for (ISeq s = RT.next(form); s != null; s = RT.next(s)) {
            FnMethod f = FnMethod.parse(fn, (ISeq) RT.first(s), fn.isStatic);
            if (f.isVariadic()) {
                if (variadicMethod == null)
                    variadicMethod = f;
                else
                    throw Util.runtimeException("Can't have more than 1 variadic overload");
            }
            else if (methodArray[f.reqParms.count()] == null)
                methodArray[f.reqParms.count()] = f;
            else
                throw Util.runtimeException("Can't have 2 overloads with same arity");
            if (f.prim != null)
                prims.add(f.prim);
        }

解析出来的 FnMethod 会根据它的参数个数加到数组 methodArray 里。这个数组的大小是 21 个,也就是说任何函数的重载分支不能超过 21 个,需要更多参数的,请使用可变参数 [& args]

其中 variadicMethod 会指向其中的可变参数分支的 FnMethodf.isVariadic 返回真),但是会检查这个分支的参数个数必须比其他分支的参数个数多:

1
2
3
4
for(int i = variadicMethod.reqParms.count() + 1; i <= MAX_POSITIONAL_ARITY; i++)
  if(methodArray[i] != null)
      throw Util.runtimeException(
              "Can't have fixed arity function with more params than variadic function");

举个例子:

1
2
user=> (fn ([a] 1) ([a b c] 3) ([a & args] :variadic))
CompilerException java.lang.RuntimeException: Can't have fixed arity function with more params than variadic function

FnMethod Parser

FnMethod 指代一个函数的重载分支,它的解析也很直白,先解析参数列表:

1
2
3
4
5
6
7
8
9
10
11
12
FnMethod method = new FnMethod(......);
......
PersistentVector argLocals = PersistentVector.EMPTY;
for(int i = 0; i < parms.count(); i++)
{
  if(!(parms.nth(i) instanceof Symbol))
      throw new IllegalArgumentException("fn params must be Symbols");
  Symbol p = (Symbol) parms.nth(i);
  ......
  argLocals = argLocals.cons(lb);
}
method.argLocals = argLocals;

参数一定是 symbol,所有参数收集到一个 PersistentVector 里。

如果有可变参数符号 &,那么就有 restParam:

1
2
3
4
5
6
7
8
static final Symbol _AMP_ = Symbol.intern("&");
......
if(p.equals(_AMP_)){
......
   state = PSTATE.REST;
......
   case REST:
      method.restParm = lb;

然后前文提到的 isVariadic 就是判断有没有 restParm:

1
2
3
4
   //是否是可变参数分支
  boolean isVariadic(){
      return restParm != null;
  }

然后解析函数 body:

1
2
method.body = (new BodyExpr.Parser()).parse(C.RETURN, body);
return method;

解析 Body 的 BodyExpr 又是一个递归调用 analyze 的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 public Expr parse(C context, Object frms) {
      ISeq forms = (ISeq) frms;
      //可能是 (do form1 form2 ...),取 next 部分。
      if(Util.equals(RT.first(forms), DO))
          forms = RT.next(forms);
      PersistentVector exprs = PersistentVector.EMPTY;
      //遍历 body form 列表,收集结果到 exprs vector
      for(; forms != null; forms = forms.next())
          {
          Expr e = (context != C.EVAL &&
                    (context == C.STATEMENT || forms.next() != null)) ?
                   analyze(C.STATEMENT, forms.first())
                                                                      :
                   analyze(context, forms.first());
          exprs = exprs.cons(e);
          }
      //如果 body 为空,返回 nil
      if(exprs.count() == 0)
          exprs = exprs.cons(NIL_EXPR);
      // 否则返回 BodyExpr
      return new BodyExpr(exprs);
  }

Primitive 参数性能优化

细心的朋友可能还注意上面 FnExpr 遍历重载分支的时候有个 prims 的链表,它会将 fn.prime 不为 null 的加进去。它是干什么的呢?

1
2
 if (f.prim != null)
   prims.add(f.prim);

其实这里是 Clojure 编译器为了 long/double 类型的参数避免类型转化和 box/unbox 引入的性能优化。prim 就是 primitive 的意思。当参数的 type hint 包含 double 或者 long 类型的并且参数个数小于4个的时候,生成的 Java 字节码方法将直接传入 long 或者 double 原生类型参数,而不是一般的 Object 类型,避免了转型和装箱拆箱。

FnMethod 相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
method.prim = primInterface(parms);

static public char classChar(Object x){
      Class c = null;
      if(x instanceof Class)
          c = (Class) x;
      else if(x instanceof Symbol)
          c = primClass((Symbol) x);
      if(c == null || !c.isPrimitive())
          return 'O';
      //如果是 long 类型,返回 L 字符
      if(c == long.class)
          return 'L';
      // 如果是 double 类型,返回 D 字符
      if(c == double.class)
          return 'D';
      throw new IllegalArgumentException("Only long and double primitives are supported");
}

static public String primInterface(IPersistentVector arglist) {
      StringBuilder sb = new StringBuilder();
      //拼接所有参数类型字符
      for(int i=0;i<arglist.count();i++)
          sb.append(classChar(tagOf(arglist.nth(i))));
      sb.append(classChar(tagOf(arglist)));
      String ret = sb.toString();
      //如果包含了 D 或者 L,并且参数个数小于 4 个,就认为是一个 prim 分支。
      boolean prim = ret.contains("L") || ret.contains("D");
      if(prim && arglist.count() > 4)
          throw new IllegalArgumentException("fns taking primitives support only 4 or fewer args");
      //特殊命名
      if(prim)
          return "clojure.lang.IFn$" + ret;
      return null;
  }

Clojure 编译器会为每个函数分支都生成一个 invoke 方法。 举例来说,对于方法 (fn [^long a ^String b] :nothing),如果没有这个优化,生成的 Java 方法大概是这样:

1
2
3
4
5
public Object invoke(Object a, Object b){
    long a  = (Long) a;
    String b = (String) b;
    return Keyword.intern("nothing");
}

对于参数 a 来说,为了转化成 long 原生类型需要经过转型和拆箱两个调用,这对于性能敏感的场景是一个损耗。有了这个优化, invoke 方法就可以是这样:

1
2
3
4
public Object invokePrim(long a, Object b){
    String b = (String) b;
    return Keyword.intern("nothing");
}

有兴趣地还可以阅读下这篇博客 warn-on-boxed。这个优化对于数值计算的性能有很大作用,从测试来看,有一倍的提升。

总结

clojure.lang.Compileranalyze 进一步将 LispReader 读出来的 form 解析成了 Expr,等待进一步的求值或者编译成 Java Class 文件。analyze 过程也是一个递归下降解释器的实现,整体实现并不复杂。

结合前文和本篇博客,我们可以给 Clojure 编译器初步画一张流程图,只考虑求值过程,暂不考虑 compile 函数:

接下来,我们即将进入求值和编译环节。

最后附赠一个 Compiler 所有 Expr 子类的 UML 图(单独打开放大):

微信 phxpaxos 源码解读:fsync 和 fdatasync

| Comments

最近在读微信开源的 paxos 实现 phxpaxos,读到 localstorage 部分学习到 fdatasync 系统调用。这一部分是非常核心的存储模块,参与者的状态信息、变更日志等都要写入磁盘并且可能要求强制刷入存储磁盘避免系统崩溃等情况下数据丢失,性能的很大一部分因素取决于这一块的实现。

phxpaxos 使用了 LevelDB 做存储,但是做了几个优化:

  • LevelDB 存储的 value 只是一个 24 个字节的 fileid 索引(有一个例外 minchosen 值),真正的状态数据存储在他们自己实现的 log_store 里, fileid 存储了在 log_store 里的文件编号、写入 vfile 的offset 和 checksum 这三个字段信息。这样做的原因是由于 leveldb 对于比较大的 value 的存取效率不是最优,通过间接一层存储,利用了 LevelDB 的 Key 的有序性和小 value 存储的性能优势,加上定制的 log_store 的写入优化,达到最优组合。
  • log_store 按照文件编号固定增长,比如 1.f2.f3.f ……以此类推(在日志目录的 vfile 目录下)。并且每个文件都是在创建的时候分配固定大小,默认 100M(还有所谓 large buffer 模式分配的是 500M)。值的写入都是 append 操作,写入完成后,将偏移量、校验和、当前文件 ID 再写入 LevelDB。读取的时候就先从 LevelDB 得到这三个信息,然后去对应的 vfile 读取实际的值。因为文件是固定大小分配,每次强制刷盘就不需要调用 fsync,而是采用 fdatasync,这样就无需做两次刷盘,因为 fdatasync 不会去刷入文件的元信息(比如大小、最后访问时间、最后修改时间等),而 fsync 是会的。

一张图来展示大概是这样:

注意到图中还有个 metafile,它存储了 log store 的当前的文件编号这个元信息,写满一个 vfile 就递增下这个计数,并更新到 metafile,为了保证不丢,对它的每次修改都需要强制 fsync。

回到文本的主题 fsync 和 fdatasync,按照 manual 的描述, fsync 同时刷入数据和元信息

1
2
3
4
fsync() transfers ("flushes") all modified in-core data of (i.e., modified
 buffer cache pages for) the file referred to by the file descriptor fd to 
 the disk device ...... It also flushes metadata information associated 
 with the file (see stat(2)).

而 fdatasync 如无必要,只刷入数据:

1
2
3
 fdatasync()  is  similar  to  fsync(),  but  does  not flush modified 
 metadata unless that metadata is needed in order to allow a subsequent 
 data retrieval to be correctly handled.

log_store 的实现因为提前分配了固定大小的文件,因此无需在刷盘的时候再去写入元信息,就可以使用 fdatasync 来优化刷盘性能,而访问时间和修改时间之类的元信息丢失对业务没有影响。

Clojure Under a Microscope(1): Clojure 如何理解代码(上)

| Comments

开篇

最近在读《Ruby Under a Microscope》(已经有中文版《Ruby 原理剖析》)。我很喜欢这本书介绍 Ruby 语言实现的方式,图文并茂,娓娓道来,不是特别深入,但是给你一个可以开始学习 Ruby 源码的框架和概念。

我大概在 2-3 年前开始阅读 Clojure Java 实现的源码,陆陆续续也有一些心得,想着可以在读 Ruby 这本书的时候,按照这本书的思路梳理一遍。因此就有了这第一篇: Clojure 如何理解代码。

目录:

IO Reader

我们抛开 leiningen 等构建工具,Clojure 唯一需要的是 JVM 和它的 jar 包,运行一段简单的 clojure 代码,可以这样:

1
2
$ java -cp clojure.jar  clojure.main -e "(println (+ 2 2))"
4

clojure.main 是所有 clojure 程序的启动入口,关于启动过程,后续会有单独一篇博客来介绍。-e 用来直接执行一段传入的 clojure 代码。

当 clojure 读到 (println (+ 2 2)) 这么一行代码的时候,它看到的是一个字符串。接下来它会将这段字符串拆成一个一个字符来读入,也就是

1
( p r i n t l n   ( +   2   2 ) )

这么一个字符列表。这是通过 java.io.PushBackReader 来完成。 Clojure 内部封装了一个 LineNumberingPushbackReader 的类继承了 PushbackReader ,并且内部封装了 Java 标准库的 LineNumberReader 来支持记录代码行列号(为了调试、报错、记录元信息等目的),并且最重要的是支持字符的回退(unread),它可以将读出来的字符『吐』回去,留待下次再读。内部其实就是一个回退字符缓冲区。

我们来试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(def r
  (-> "(println (+ 2 2))"
      (java.io.StringReader.)
      (clojure.lang.LineNumberingPushbackReader.)))

(.read r)            ; => 40  '('
(.read r)            ; => 112 'p'
(.read r)            ; => 114 'r'
(.unread r 114)      ; 『吐』回 'r' 
(.read r)            ; => 114 'r'   
(.read r)            ; => 105 'i' 
(.getLineNumber r)   ;  获取行号,从 1 开始
(.getColumnNumber r) ;  获取列号,从 0 开始
......

read 返回的的是字符串的整数编码(0 – 65535),Clojure 默认使用的是 UTF-8 编码。查看一个字符的整数编码可以 int 强制转换:

1
2
(int \()  ; => 40
(int \你) ; => 20320

上面的例子中我们 unread 了 114(也就是字符 ‘r’),然后下次调用 read,返回的仍然是 114。Clojure 的词法解析需要依赖这个回退功能。

此外还可以通过 getLineNumbergetColumnNumber 获取代码的行号和列号。这个行列信息最终会在 Clojure 对象的 metadata 里,比如我们看下 + 这个函数的行列信息:

1
2
user=> (select-keys (meta #'clojure.core/+) [:column :line :file])
{:column 1, :line 965, :file "clojure/core.clj"}

LispReader

单个字符是没有意义,接下来,Clojure 需要理解这些字符组成的字符串是个什么东西,理解了之后才能去执行求值。

这个『东西』,在 Clojure 里定义为 form。form 其实不是 clojure 特有的概念,而应该说是 lisp 系的语言都有一个概念。form 该怎么理解呢? 粗糙地理解,它是 Clojure 的对象,对应了一种 clojure 数据类型。更精确地说,form 是一个可以被正常求值的『程序单元』。

form 可以是:

  • Literals 字面量,比如字符、字符串、数字、nil、true/false 布尔值等等。
  • Symbol 符号,可以先简单地理解成类似 Java 的变量名称 identifier。
  • Lists 括号括起来的列表,如 (a b c)
  • Vectors 这是 clojure 有别于其他 lisp 方言的地方,中括号括起来的列表 [1 2 3]
  • Maps 散列表 {:a 1 :b 2}
  • Sets/Map namespace(1.9 新增)、deftype、defrecord 等其他类型。

那么 Clojure 是怎么将上面 reader 读到的字符流理解成 form 的呢?这是通过 LispReader 来完成,他负责将字符流解析成 form。我们尝试调用它的 read 方法来读取下 "(println (+ 2 2))",看看结果是什么:

1
2
3
4
5
 (def r
  (-> "(println (+ 2 2))"
      (java.io.StringReader.)
      (clojure.lang.LineNumberingPushbackReader.)))
 (def form (clojure.lang.LispReader/read r nil))

查看下 form:

1
2
3
4
user=> form
(println (+ 2 2))
user=> (class form)
clojure.lang.PersistentList

这个 form 的『样子』和它的文本字符串是一模一样的 (println (+ 2 2)),可是它不是字符串了,而是一个 List —— Clojure 的数据结构也是最重要的数据结构。这个一模一样就是所谓的同像性,也就是 Homoiconicity。因为 form 其实就是 AST,(println (+ 2 2)) 是一个层次的嵌套结构,转换成树形如下:

image

对应的刚好也是语法树,那么同像性就赋予我们操作这棵语法树的能力,因为它本质上就是一个普通的 Clojure 『对象』,也就是 form。我们可以随心所欲的操作这个 form,这也是 Clojure 强大的元编程能力的基础。

如果对应到编译原理, LispReader 不仅是 Lexer,同时也是 Parser。除了读取解析出词法单元之外,还会检查读取的结果是否是一个合法的可以被求值的 form,比如我们故意少一个括号:

1
2
user=> (read-string  "(+ 1 2")
RuntimeException EOF while reading  clojure.lang.Util.runtimeException (Util.java:221)

read-string 和另一个函数 read 最终调用的还是 LispReader,因为少了个括号,它会报错,这不是一个合法的 form。

Clojure 的编译器是 one-pass 还是 two-pass?

编译器可以多遍扫描源码,做分词、解析、优化等等工作。那么 Clojure 编译器是几遍?

严格来讲, Clojure 的编译器是 two-pass 的,但是很多情况下都是 one-pass 的。

但是 pass 这个概念在 clojure 里不是特别合适,按照 Rich Hickey 的答复,Clojure 的编译器更多是按照一个一个编译单元来描述更合适。每个单元是一个顶层(toplevel) form。

比如你有一个 clojure 代码文件:

1
2
3
(def a 1)
(def b 2)
(println (+ 1 2))

clojure 编译器会认为这里有三个顶层编译单元,分别是 (def a 1)(def b 2)(println (+ 1 2)),这三个编译单元都是最顶层的 form,它们会按照在文件中的出现顺序一一编译。

正因为编译单元要按照这个顺序,因此其实 clojure 不支持循环引用,或者前向查找(但是特别提供了 declare):

1
2
(def b 2)
(println (+ a b))

第二个 form 将报错,因为找不到 a:

1
 Unable to resolve symbol: a in this context

请注意,前向查找跟多少遍扫描没有关系,一遍扫描也可以实现前向查找。Clojure 这里的选择是基于两个理由:编译性能和名称冲突考虑。参见这个 YC 上的回复

LispReader 实现

LispReader 的实现是一个典型的递归下推机,往前读一个字符,根据这个字符的类型通过一系列 if 语句判断要执行哪一段解析,完整代码在 github,核心的循环代码精简如下,并加上注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
for(; ;){
           //读取到一个 List,返回。
          if(pendingForms instanceof List && !((List)pendingForms).isEmpty())
              return ((List)pendingForms).remove(0);

          //读一个字符
          int ch = read1(r);

          //跳过空白,注意,逗号也被认为是空白
          while(isWhitespace(ch))
              ch = read1(r);

          //读到末尾
          if(ch == -1)
              {
              if(eofIsError)
                  throw Util.runtimeException("EOF while reading");
              return eofValue;
              }

          //读到设定的返回字符,提前返回。
          if(returnOn != null && (returnOn.charValue() == ch)) {
              return returnOnValue;
          }

          //可能是数字
          if(Character.isDigit(ch))
              {
              Object n = readNumber(r, (char) ch);
              return n;
              }

          //根据字符,查找 reader 表,走入更具体的解析
          IFn macroFn = getMacro(ch);
          if(macroFn != null)
              {
              Object ret = macroFn.invoke(r, (char) ch, opts, pendingForms);
              //no op macros return the reader
              if(ret == r)
                  continue;
              return ret;
              }
          //如果是正负符号,进一步判断可能是数字
          if(ch == '+' || ch == '-')
              {
              //再读一个字符
              int ch2 = read1(r);
              //如果是数字
              if(Character.isDigit(ch2))
                  {
                  //先回退 ch2 ,继续调用 readNumber 读出数字。
                  unread(r, ch2);
                  Object n = readNumber(r, (char) ch);
                  return n;
                  }
              //不是数字,回退 ch2
              unread(r, ch2);
              }
          //读取 token,并解析
          String token = readToken(r, (char) ch);
          return interpretToken(token);
          }
}

LispReader 维护了一个字符到 reader 的映射,专门用于读取特定的 form,也就是上面 getMacro 用到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   static IFn[] macros = new IFn[256]; //特殊宏字符到 Reader 函数的映射
  macros['"'] = new StringReader();  // 双引号开头的使用字符串Reader
  macros[';'] = new CommentReader();  // 注释
  macros['\''] = new WrappingReader(QUOTE); // quote 
  macros['@'] = new WrappingReader(DEREF);// deref符号 @
  macros['^'] = new MetaReader();   //元数据
  macros['`'] = new SyntaxQuoteReader(); // syntax quote
  macros['~'] = new UnquoteReader();   // unquote
  macros['('] = new ListReader();      //list 
  macros[')'] = new UnmatchedDelimiterReader();  //括号不匹配
  macros['['] = new VectorReader();   //vector
  macros[']'] = new UnmatchedDelimiterReader();  // 中括号不匹配
  macros['{'] = new MapReader();     // map
  macros['}'] = new UnmatchedDelimiterReader();  // 大括号不匹配
  macros['\\'] = new CharacterReader();   //字符,如\a
  macros['%'] = new ArgReader();   // 匿名函数便捷记法里的参数,如%, %1
  macros['#'] = new DispatchReader();  // #下面将提到的 dispatch macro
  
  static private IFn getMacro(int ch){
    if(ch < macros.length)
        return macros[ch];
    return null;
   }

ListReader 实现解析

我们先看下 ListReader,它是一个普通的 Clojure 函数,继承 AFn,并实现了 invoke 调用方法,关于 Clojure 的对象或者说运行时模型,我们后文再谈,ListReader 核心的代码如下:

1
2
3
List list = readDelimitedList(')', r, true);
IObj s = (IObj) PersistentList.create(list);
return s;

调用了 readDelimitedList 获取了一个 List 列表,然后转换成 Clojure 的 PersistentList 返回。readDelimitedList 的处理也很容易理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//收集结果
ArrayList a = new ArrayList();

for(; ;)
  {
  int ch = read1(r);
  //忽略空白
  while(isWhitespace(ch))
      ch = read1(r);
  //非法终止
  if(ch == -1)
      {
      if(firstline < 0)
          throw Util.runtimeException("EOF while reading");
      else
          throw Util.runtimeException("EOF while reading, starting at line " + firstline);
      }
  //读到终止符号,也就是右括号),停止
  if(ch == delim)
      break;
  //可能是macro fn
  IFn macroFn = getMacro(ch);
  if(macroFn != null)
      {
      Object mret = macroFn.invoke(r, (char) ch);
      //no op macros return the reader
      
      //macro fn 如果是no op,返回reader本身
      if(mret != r)
          //非no op,加入结果集合
          a.add(mret);
      }
  else
      {
      //非macro,回退ch
      unread(r, ch);
      //读取object并加入结果集合
      Object o = read(r, true, null, isRecursive);
      //同样,根据约定,如果返回是r,表示null
      if(o != r)
          a.add(o);
      }
  }
//返回收集的结果集合

return a;

再举一个例子,MetaReader,用于读取 form 的元信息。

MetaReader 解析

Clojure 可以为每个 form 附加上元信息,例如:

1
2
user=> (meta (read-string "^:private (+ 2 2)"))
{:private true}

通过 ^:private,我们给 (+ 2 2) 这个 form 设置了元信息 private=true。当 LispReader 读到 ^ 字符的时候,它从 macros 表找到 MetaReader,然后使用它来继续读取元信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
   //meta对象,可能是map,可能是symbol,也可能是字符串,例如(defn t [^"[B" bs] (String. bs))
   Object meta = read(r, true, null, true);
  //symbol 或者 字符串,就是简单的type hint tag
  if(meta instanceof Symbol || meta instanceof String)
      meta = RT.map(RT.TAG_KEY, meta);
  //如果是keyword,证明是布尔值的开关变量,如 ^:dynamic ^:private
  else if (meta instanceof Keyword)
      meta = RT.map(meta, RT.T);
  //如果连 map 都不是,那很抱歉,非法的meta数据
  else if(!(meta instanceof IPersistentMap))
      throw new IllegalArgumentException("Metadata must be Symbol,Keyword,String or Map");

  //读取要附加元数据的目标对象
  Object o = read(r, true, null, true);
  if(o instanceof IMeta)
      //如果可以附加,那么继续走下去
      {
      if(line != -1 && o instanceof ISeq)
          {
          //如果是ISeq,加入行号,列号
          meta = ((IPersistentMap) meta).assoc(RT.LINE_KEY, line).assoc(RT.COLUMN_KEY, column);
          }
      if(o instanceof IReference)
          {
          //如果是 ref,重设 meta
          ((IReference)o).resetMeta((IPersistentMap) meta);
          return o;
          }
      //增加 meta 到原有的 ometa
      Object ometa = RT.meta(o);
      for(ISeq s = RT.seq(meta); s != null; s = s.next()) {
      IMapEntry kv = (IMapEntry) s.first();
      ometa = RT.assoc(ometa, kv.getKey(), kv.getValue());
      }
      //关联到o
      return ((IObj) o).withMeta((IPersistentMap) ometa);
      }
  else
      //不可附加元素,抱歉,直接抛出异常
      throw new IllegalArgumentException("Metadata can only be applied to IMetas");

从代码里可以看到,不是所有 form 都可以添加元信息的,只有实现 IMeta 接口的 IObj 才可以,否则将抛出异常:

1
2
user=> ^:private 3
IllegalArgumentException Metadata can only be applied to IMetas  clojure.lang.LispReader$MetaReader.invoke (LispReader.java:820)

Dispatch Macros

Clojure 同时还支持 # 字符开始的所谓 dispatch macros,比如正则表达式 #"abc" 或者忽略解析的 #_(form)。这部分的解析也是查表法:

1
2
3
4
5
6
7
8
9
dispatchMacros['^'] = new MetaReader();  //元数据,老的形式 #^
dispatchMacros['\''] = new VarReader();   //读取var,#'a,所谓var-quote
dispatchMacros['"'] = new RegexReader();  //正则,#"[a-b]"
dispatchMacros['('] = new FnReader();    //匿名函数快速记法 #(println 3)
dispatchMacros['{'] = new SetReader();   // #{1} 集合
dispatchMacros['='] = new EvalReader();  // eval reader,支持 var 和 list的eval
dispatchMacros['!'] = new CommentReader();  //注释宏, #!开头的行将被忽略
dispatchMacros['<'] = new UnreadableReader();   // #< 不可读
dispatchMacros['_'] = new DiscardReader();   //#_ 丢弃

LispReader 读到 # 字符的时候,会从 macros 表找到 DispatchReader,然后在 DispatchReader 内部继续读取一个字符,去 dispatchMacros 找到相应的 reader 进行下一步解析。

更多 Reader 源码解析,可以参考我的注解,或者自行研读。

本篇总结

一张图来总结本篇所介绍的内容:

reader

Clojure 在从文件或者其他地方读取到代码文本后,交给 IO Reader 拆分成字符,然后 LispReader 将字符流解析成可以被求值的 form。

我们前面提到 LispReader 同时是 Lexer 和 Parser,但是它并不是完整意义上的 Parser,比如它不会去检查 if 的使用是否合法:

1
2
3
4
user=> (read-string "(read-string "(if 1 2 3 4)")")
(if 1 2 3 4)
user=> (if 1 2 3 4)
CompilerException java.lang.RuntimeException: Too many arguments to if, compiling:(NO_SOURCE_PATH:93:1)

LispReader 只会检查它是否是一个合法的 form,而不会去检查它的语义是否正确,更进一步的检查需要 clojure.lang.Compiler 介入了,它会执行一个 analyze 解析过程来检查,这是下一篇要讲的内容。

Clojure 并发实践:使用 pmap 加速程序

| Comments

LeanCloud 的控制台会展示一个应用列表,应用列表会展示该用户的所有应用,以及每个应用的基本信息,例如总用户数、昨天请求量和本月请求量等。我们最多允许每个用户创建 50 个应用。伪代码大概是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(defn add-app-info
  "添加应用统计信息。"
  [app]
  (assoc app
         :yesterday_reqs (count-reqs app 7)
         :monthly_reqs (count-reqs app 30)
         :total_users (count-users app)))

(defn get-client-apps
  "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (map add-app-info)))

显然,这里每个应用为了获取这些请求信息,都至少要请求三次。虽然这些统计请求本身已经有了缓存,但是假设有 50 个应用(实际中,部分开发者的应用数量包括协作应用在内会更多),那就需要发起 150 个请求,这个过程如果完全串行处理的话,假设 add-app-info 的开销至少是 1~3 毫秒,串行处理下来也需要 50~150 毫秒,加上传输的时间,那么用户的体验的就相当差了。

这时候,我们可以用并发处理来加速了,你只需要替换一个函数,将 get-client-appsmap 替换为 pmap 即可:

1
2
3
4
5
6
(defn get-client-apps
   "获取用户的应用列表"
  [client_id]
  (->> client_id
       (db/find-apps-by-client-id)
       (pmap add-app-info)))

关于 pmap 的讨论参见 并发函数pmap、pvalues和pcalls。因为 pmap 对于 chunked sequnce 的处理是批量处理,因此最多同时使用 32 个并发任务在处理,这个线程数量在这个场景下是可以接受的。加速后的性能也可以估算出来 (Math/round (/ n 32.0)) x (1~3) 毫秒。 在实际线上环境中,大概加速了 3~4 倍左右。

在测量性能的时候,注意使用 doall,因为 clojure 的 LazySeq 特性会干扰你的测试。

一个函数替换就能获得并发加速,抽象的力量在这里体现的淋漓尽致。

Xmemcached 死锁分析和 Aviator 可变参数方法实现

| Comments

首先是 xmemcached 发了 2.2.0 版本,最重要解决的问题就是请求超时。详细的情况可以参考这个 issue 。推荐所有还在用 xmc 的朋友升级到这个版本,性能和稳定性都有所改进。

这个 bug 的原因可能更值得说道说道。

xmemcached 本身会对发出去的请求维护一个队列,在 onMessageSent 也就是消息写到 socket 后将请求放入队列,然后在收到 memcached 返回应答的时候,找出当前的请求来 decode 应答内容。伪代码是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
//Handler 里加入队列。
public void onMessageSent(Command msg, Session session){
    session.getQueue().offer(msg);
}

//Decoder 里做解码
public Command decode(ByteBuffer buf, Session session){
    Command cmd = session.getQueue().take().decode(buf);
    if(cmd!=null)
        return cmd;
    else
        return null;
}

这个 Bug 的关键就在于加入队列的时候和 take 的使用。 take 会阻塞当前操作,直到队列中有可用的元素或者被中断。而我们放入队列的时候是在命令被完全写入 socket 之后(有没有发出去,你无法确认的,因为有 socket 缓冲区、网卡缓冲区的存在)。其次是这两段逻辑是发生在同一个处理线程上。

那么当用户写入一个超过 1M 的数据的时候,假设是 2M。因为 memcached 最多只允许保存 1M 大小的数据,当 xmemcached 将超过 1M 但是还没有达到 2M 的数据发送到 memcached 后, memcached 立即应答返回错误。但是此时,数据还没有完全写出去,导致命令没有被加入队列,同时 take 也取不到数据,我们遇到了死锁: take 在等待命令加入,而写入命令数据的线程被 take 阻塞了没有机会继续写

找到问题解决就很简单了: 将放入队列的时机从完全写入后,放到开始写第一个字节之前的时候;或者将 take 改成 poll,不阻塞当前线程,当没有可用命令的时候会去重试。我选择了前者的方案。因为 memcached 的典型场景是读多写少,更希望尽快地 decode 出结果来响应。

Aviator 的自定义函数是继承 AbstractFunction,然后复写特定参数 arity 的方法实现即可。但是后来有用户发现无法处理超过 20 个参数的可变参数。回顾下代码,确实是没有处理这个逻辑。当超过 20 个参数的时候,应该将多余的参数封装成数组来调用。

Java 里的可变参数就是数组,一个简单例子:

1
2
3
4
5
6
7
8
public class Test{
    public static void method(int a, int ...b){
    }

    public static void main(String []args){
        method(1, 2, 3, 4);
    }
}

编译后 javap -v Test 看看字节码,关键是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     0: iconst_1
     1: iconst_3
     2: newarray       #int,创建数组
     4: dup
     5: iconst_0
     6: iconst_2
     7: iastore        #将 2 放入数组
     8: dup
     9: iconst_1
    10: iconst_3
    11: iastore        #将 3 放入数组
    12: dup
    13: iconst_2
    14: iconst_4
    15: iastore        #将4放入数组
    16: invokestatic  #2                  // Method method:(I[I)V

最终调用的方法签名是 (I[I)V,也就是 void (int a, int [] b)

Aviator 的 Parser 是 one pass 的,解析的同时生成字节码,就是一个典型的递归下降解释器。当遇到方法调用,因为是一遍扫描,在完全解析完整个方法调用之前,我是不知道有多少个参数的,因此就不知道应该创建多大的额外参数数组。

遇到这种不知道多少元素将加入的集合的问题,那肯定不能用数组了,直接用 List 吧。基本的实现逻辑也很简单,当解析到第 20 个参数,就创建一个 List 实例,后面再解析到的参数就加入这个 List。到解析方法调用完成后,将前面的 20 个参数,加上 list 里面的元素组成的数组,一起做个调用,伪代码类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void onMessageParam(Token token){
    if(getEnv().getParamsNumber()) > 20){
        List list = getEnv().getOrCreateParamList();
        list.add(token);
    }
    onTernary(token);
}

public void onMessageInvoke(Token token){
    List list = getEnv().getOrCreateParamList();
    AviatorObject [] extraParams = new AviatorObject[list.size()];
    extraParams = list.toArray(extraParams);

    ......
    function.call(param1, param2, ......., extraParams);
}

当然实际的代码是用 ASM 直接写成字节码的。

Clojure 并发实践: future 和 promise 处理异步返回值

| Comments

Clojure 的并发方面的详细介绍可以参考我过去总结的 wiki —— Clojure 并发。 这次又想写个系列,介绍下实际编程中对这些并发机制的应用。

不过,很可能不会涉及 STM。 LeanCloud 本质上是一个 web 型的应用,基础的并发模型已经由 web server 和后端存储决定了,STM 的适应场景没有出现过。

这一篇先从 future 和 promise 开始。

最近处理这么一个任务,有一段业务代码要调用一个第三方接口来查询域名备案号,但是呢,这个第三方接口非常不稳定,经常查询出错或者超时,导致这个业务经常不可用。

1
2
3
(defn query-icp [domain]
     ;; HTTP 调用第三方接口 API 。
     (query-icp-from-thirdparty1 domain))

为了提高这个接口的稳定性,我们引入另一个查询服务,想让两个服务来竞争,谁能返回正常结果,就用谁的。假设这个服务封装成了函数 query-icp-from-thirdparty2

ok,我们先加个 or 上去

1
2
3
4
(defn query-icp [domain]
  (or
    (query-icp-from-thirdparty1 domain)
    (query-icp-from-thirdparty2 domain)))

先尝试从一个服务查询,如果没有返回就尝试第二个服务。

但是这样有个问题,第三方服务的调用我们是一定要设置一个超时的。这个 or 改动我们改变了 query-icp 的超时承诺,原来最多等待 query-icp-from-thirdparty1 超时,现在可能遇到最高两倍的超时时间(假设两个服务都遇到超时),因为两个是顺序调用的,这肯定是不能接受的。

第一时间想到,我们将查询并发,启个线程去同时去查询两个服务,这时候就可以用 future。其次,任何一个服务如果有结果返回,我们就使用它,不等另一个服务的结果。在 Java 里我们可以用 CountDownLatch 或者 CompletionService。 在 Clojure 里我们可以用 promise + deliver。

1
2
3
4
5
6
7
8
9
10
(defn- do-query-icp [p f domain]
  (future
    (when-let [ret (f domain)]
      (deliver p ret))))

(defn query-icp [domain]
  (let [p (promise)]
    (do-query-icp p query-icp-from-thirdparty1 domain)
    (do-query-icp p query-icp-from-thirdparty2 domain)
    (deref p :5000 nil)))

do-query-icp 里我们利用 future 来异步调用接口,当接口有返回的时候,使用 deliver 将结果 ret 喂给 promise

而在 query-icp 里,我们先创建一个 promise,然后接连发起两次 do-query-icp 异步请求分别调用两个服务,然后利用 (deref p timeout-ms timeout-val) 等待结果,同时设置超时 5 秒和超时后的返回值 nil

当然,实质上, clojure 的 promise 也是基于 CountDownLatch 实现。