庄周梦蝶

生活、程序、未来

声明:本博客所有文章,未经允许,禁止转载。谢谢。

Xmemcached 死锁分析和 Aviator 可变参数方法实现

| Comments

首先是 xmemcached 发了 2.2.0 版本,最重要解决的问题就是请求超时。详细的情况可以参考这个 issue 。推荐所有还在用 xmc 的朋友升级到这个版本,性能和稳定性都有所改进。

这个 bug 的原因可能更值得说道说道。

xmemcached 本身会对发出去的请求维护一个队列,在 onMessageSent 也就是消息写到 socket 后将请求放入队列,然后在收到 memcached 返回应答的时候,找出当前的请求来 decode 应答内容。伪代码是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
//Handler 里加入队列。
public void onMessageSent(Command msg, Session session){
    session.getQueue().offer(msg);
}

//Decoder 里做解码
public Command decode(ByteBuffer buf, Session session){
    Command cmd = session.getQueue().take().decode(buf);
    if(cmd!=null)
        return cmd;
    else
        return null;
}

这个 Bug 的关键就在于加入队列的时候和 take 的使用。 take 会阻塞当前操作,直到队列中有可用的元素或者被中断。而我们放入队列的时候是在命令被完全写入 socket 之后(有没有发出去,你无法确认的,因为有 socket 缓冲区、网卡缓冲区的存在)。其次是这两段逻辑是发生在同一个处理线程上。

那么当用户写入一个超过 1M 的数据的时候,假设是 2M。因为 memcached 最多只允许保存 1M 大小的数据,当 xmemcached 将超过 1M 但是还没有达到 2M 的数据发送到 memcached 后, memcached 立即应答返回错误。但是此时,数据还没有完全写出去,导致命令没有被加入队列,同时 take 也取不到数据,我们遇到了死锁: take 在等待命令加入,而写入命令数据的线程被 take 阻塞了没有机会继续写

找到问题解决就很简单了: 将放入队列的时机从完全写入后,放到开始写第一个字节之前的时候;或者将 take 改成 poll,不阻塞当前线程,当没有可用命令的时候会去重试。我选择了前者的方案。因为 memcached 的典型场景是读多写少,更希望尽快地 decode 出结果来响应。

Aviator 的自定义函数是继承 AbstractFunction,然后复写特定参数 arity 的方法实现即可。但是后来有用户发现无法处理超过 20 个参数的可变参数。回顾下代码,确实是没有处理这个逻辑。当超过 20 个参数的时候,应该将多余的参数封装成数组来调用。

Java 里的可变参数就是数组,一个简单例子:

1
2
3
4
5
6
7
8
public class Test{
    public static void method(int a, int ...b){
    }

    public static void main(String []args){
        method(1, 2, 3, 4);
    }
}

编译后 javap -v Test 看看字节码,关键是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
     0: iconst_1
     1: iconst_3
     2: newarray       #int,创建数组
     4: dup
     5: iconst_0
     6: iconst_2
     7: iastore        #将 2 放入数组
     8: dup
     9: iconst_1
    10: iconst_3
    11: iastore        #将 3 放入数组
    12: dup
    13: iconst_2
    14: iconst_4
    15: iastore        #将4放入数组
    16: invokestatic  #2                  // Method method:(I[I)V

最终调用的方法签名是 (I[I)V,也就是 void (int a, int [] b)

Aviator 的 Parser 是 one pass 的,解析的同时生成字节码,就是一个典型的递归下降解释器。当遇到方法调用,因为是一遍扫描,在完全解析完整个方法调用之前,我是不知道有多少个参数的,因此就不知道应该创建多大的额外参数数组。

遇到这种不知道多少元素将加入的集合的问题,那肯定不能用数组了,直接用 List 吧。基本的实现逻辑也很简单,当解析到第 20 个参数,就创建一个 List 实例,后面再解析到的参数就加入这个 List。到解析方法调用完成后,将前面的 20 个参数,加上 list 里面的元素组成的数组,一起做个调用,伪代码类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void onMessageParam(Token token){
    if(getEnv().getParamsNumber()) > 20){
        List list = getEnv().getOrCreateParamList();
        list.add(token);
    }
    onTernary(token);
}

public void onMessageInvoke(Token token){
    List list = getEnv().getOrCreateParamList();
    AviatorObject [] extraParams = new AviatorObject[list.size()];
    extraParams = list.toArray(extraParams);

    ......
    function.call(param1, param2, ......., extraParams);
}

当然实际的代码是用 ASM 直接写成字节码的。

声明:本博客所有文章,未经允许,禁止转载。谢谢。

Java, Xmemcached, 开源

« Clojure 并发实践: future 和 promise 处理异步返回值 Clojure 并发实践:使用 pmap 加速程序 »

Comments