标签 C 下的文章

Memcached CAS应用一例

近期收到客户一个需求,我将该需求转述为下面这个等价的问题。

【问题】

    * 有一个产品包装系统S,为某种产品P提供产品包装服务;

    * 系统S由若干个处理节点组成,每个节点都可以单独处理组件;

    * 产品P的一个可出厂的成品由包装盒+N个产品组件组成,包装盒与产品组件上都贴有一个标签,该标签上包含该成品的唯一编号ID(一定时间范围内有效)、每个组件自己的序号(unit-num)以及成品的组件总个数(unit-total)。每个成品只有一个包装盒,该包装盒的组件序号为0。其中unit-num <= unit_total == N <= 32;

    * 某个成品的诸多组件是乱序到达S并由S送到产品包装工位的;当系统S第一次接收到一个成品的某个组件时,S会将一个包装盒贴上该组件对应的成品ID,并将其放在传送带上,传送给对应的组装工位;当系统S接收到同一成品的其他组件时,不再重新发放包装盒了;

    * 系统S具有剔除冗余组件的功能,如果某个成品的某个组件(序号为n)已经被S接收并送到指定包装位,后续若再出现同一成品的相同序号组件(可能是因为标签贴错导致),S将会将该冗余组件剔除出包装线;

    * 当某个成品的最后一个组件被S处理后,该成品的ID即告无效了,可以被后续成品重复使用了。

【解决思路】

这个问题中有几个关键功能点:

    * 每个成品只分配一个包装盒;

    * 支持剔重;

    * 当最后一个组件被处理后,成品ID被从系统中删除,可被后续成品重复使用。

这是一个典型的多个节点并发操作的一致性问题,我们初步考虑基于开源的MemcachedCAS服务去解决该问题,解决思路如下:

    a) S系统中的某个节点收到某成品的某个组件(unit_num = n)后,以ID为Key尝试获取成品的Value(以及item_cas值);如果索引尚未在系统建立,那么创建索引,以ID为Key,Value为一整型字符串,初值为1<<(n-1);并分配包装盒;

    b) 如果以成品ID为Key的索引已经建立,系统节点将组件的(1<<n)与Value进行“与操作”以判断该组件是否为重复组件,如果为1,则为重复组件;否则以(Value +  1 << (n-1))的值以及获得的item_cas发起cas操作;

    c) 如果cas操作成功,则数一下((Value +  1 << (n-1)) 中置位(=1)的bit个数,如果个数==unit-total,则删除索引;否则继续处理下一个组件;
        如果cas操作失败,则回到步骤a)。

【Demo代码】

/* pack_sys.c */

… …
#include <libmemcached/memcached.h>

static const char *product_id = "nexus5";
static const int component_in_total = 5;
static const int component_order[] = {2, 3, 1, 2, 5, 4};

//code from <Algorithms.for.Programmers.Ideas.and.Source.Code>
static inline unsigned long long
bit_count(unsigned long long x)
{
    x = (0x5555555555555555UL & x) + (0x5555555555555555UL & (x >> 1));
    x = (0x3333333333333333UL & x) + (0x3333333333333333UL & (x >> 2));
    x = (0x0f0f0f0f0f0f0f0fUL & x) + (0x0f0f0f0f0f0f0f0fUL & (x >> 4));
    x = (0x00ff00ff00ff00ffUL & x) + (0x00ff00ff00ff00ffUL & (x >> 8));
    x = (0x0000ffff0000ffffUL & x) + (0x0000ffff0000ffffUL & (x >> 16));
    x = (0x00000000ffffffffUL & x) + (0x00000000ffffffffUL & (x >> 32));
    return x;
}

int
main(int argc, char *argv[])
{
    memcached_st *memc;
    memcached_return_t rc = MEMCACHED_SUCCESS;
    memcached_server_st *server = NULL;

    memc = memcached_create(NULL);
    if (NULL == memc) {
        printf("memcached_create error\n");
        return -1;
    }

    … …

    rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, 1);
    if (rc != MEMCACHED_SUCCESS) {
        printf("memcached_behavior_set support cas error: %s\n",
                memcached_strerror(memc, rc));
        return -1;
    }

    /* pack the component one by one */
    int ret = 0;
    int i = 0;
    for (i = 0; i < sizeof(component_order)/sizeof(component_order[0]); i++) {
        ret = pack_component(memc, component_order[i]);
        if (ret == 0) {
            printf("pack component [%d] ok\n”, component_order[i]);
        } else if (ret == 1) {
            printf("pack component [%d] exists\n”, component_order[i]);
        } else {
            printf("other error occurs\n");
            return -1;
        }
        getchar();
    }

    return 0;
}

int
pack_component(memcached_st *memc, int i)
{
    memcached_return_t rc = MEMCACHED_SUCCESS;

    uint32_t mask = 1 << (i – 1);
    uint32_t value_added = 1 << (i – 1);
    char value_added_str[11] = {0};
    uint32_t value = 0;
    char *pvalue = NULL;
    size_t value_len = 0;
    uint32_t flags = 0;

    while(1) {
        pvalue = memcached_get(memc, product_id, strlen(product_id),
                               &value_len, &flags, &rc);
        if (!pvalue) {
            if (rc == MEMCACHED_NOTFOUND) {
                printf("componet [%d] – memcached_get not found product key: [%s]\n",
                       i, product_id);
                memset(value_added_str, 0, sizeof(value_added_str));
                sprintf(value_added_str, "%u", value_added);
                rc = memcached_add(memc, product_id, strlen(product_id), value_added_str,
                                   strlen(value_added_str), 1000, 0);
                if (rc == MEMCACHED_DATA_EXISTS) {
                    printf("componet [%d] – memcached_add key[%s] exist\n", i, product_id);
                    pvalue = memcached_get(memc, product_id, strlen(product_id),
                                           &value_len, &flags, &rc);
                    if (!pvalue) return -1;
                } else if (rc != MEMCACHED_SUCCESS) {
                    printf("componet [%d] – memcached_add error: %s, [%d]\n",
                            i, memcached_strerror(memc, rc), rc);
                    return -1;
                } else {
                    printf("componet [%d] – memcached_add key[%s] successfully,"
                           " its value = %u, cas = %llu\n",
                            i,product_id,
                            value_added, (memc->result).item_cas);
                    return 0;
                }
            } else {
                printf("componet [%d] – memcached_get error: %s, %d\n",
                       i, memcached_strerror(memc, rc), rc);
                return -1;
            }
        }

        value = atoi(pvalue);
        printf("componet [%d] – memcached_get value = %u, cas = %llu\n",
                i, value, (memc->result).item_cas);

        if (value & mask) {
            free(pvalue);
            return 1;
        } else {
            uint64_t cas_value = 0;
            cas_value = (memc->result).item_cas;
            memset(value_added_str, 0, sizeof(value_added_str));
            sprintf(value_added_str, "%d", value_added + value);

            rc = memcached_cas(memc, product_id, strlen(product_id),
                               value_added_str, strlen(value_added_str),
                               1000, 0, cas_value);
            if (rc != MEMCACHED_SUCCESS) {
                printf("componet [%d] -  memcached_cas error = %d,  %s\n",
                        i, rc, memcached_strerror(memc, rc));
                free(pvalue);
            } else {
                printf("componet [%d] -  memcached_cas ok\n", i);
                free(pvalue);
                if (bit_count(value_added + value) == component_in_total) {
                    rc = memcached_delete(memc, product_id, strlen(product_id), 0);
                    if (rc != MEMCACHED_SUCCESS) {
                        printf("memcached_delete error: %s\n",
                                memcached_strerror(memc, rc));
                        return -1;
                    } else {
                        printf("memcached_delete key: %s ok\n", product_id);
                    }
                }
                return 0;

            }
        }
        getchar();
    }

    return 0;
}

代码看起来较多,主要是要考虑各种异常情况。

我们可以通过先后启动两个pack_sys来验证程序逻辑的正确性:

窗口1:
$> pack_sys
componet [2] – memcached_get not found product key: [nexus5]
componet [2] – memcached_add key[nexus5] successfully, its value = 2, cas = 0
pack component [2] ok

窗口2:
$> pack_sys
componet [2] – memcached_get value = 2, cas = 54
pack component [2] exists

若两个窗口继续交替执行,一种可能的结果如下:

窗口1:

$> pack_sys
componet [2] – memcached_get not found product key: [nexus5]
componet [2] – memcached_add key[nexus5] successfully, its value = 2, cas = 0
pack component [2] ok

componet [3] – memcached_get value = 2, cas = 54
componet [3] -  memcached_cas ok
pack component [3] ok

componet [1] – memcached_get value = 6, cas = 55
componet [1] -  memcached_cas ok
pack component [1] ok

componet [2] – memcached_get value = 23, cas = 57
pack component [2] exists

componet [5] – memcached_get not found product key: [nexus5]
componet [5] – memcached_add key[nexus5] successfully, its value = 16, cas = 0
pack component [5] ok

componet [4] – memcached_get value = 16, cas = 59
componet [4] -  memcached_cas ok
pack component [4] ok

窗口2:

$> pack_sys
componet [2] – memcached_get value = 2, cas = 54
pack component [2] exists

componet [3] – memcached_get value = 7, cas = 56
pack component [3] exists

componet [1] – memcached_get value = 7, cas = 56
pack component [1] exists

componet [2] – memcached_get value = 7, cas = 56
pack component [2] exists

componet [5] – memcached_get value = 7, cas = 56
componet [5] -  memcached_cas ok
pack component [5] ok

componet [4] – memcached_get value = 23, cas = 57
componet [4] -  memcached_cas ok
memcached_delete key: nexus5 ok
pack component [4] ok

全部Demo代码已经上传到github上了,感兴趣可以去下载。

【其它】

* 我用的是libmemcached 1.0.17版本,memcached 1.4.15版本。

* libmemcached启用cas后,只能在ascii模式下工作,在binary下会得到如下错误,应该是libmemcached的bug;

     memcached_cas error,  SERVER END, 21 

* libmemcached的官方文档中某些内容似乎已经落伍了,与代码的实际行为已经不一致了,参考manual的时候要小心,最好能对着源码看。

* 关于问题调试,可以考虑通过-vv命令行选项打开memcached的详细日志,这样你就可以看到memcached的一举一动,特别是涉及到binary protocol时,这样调试更有效率。

为阻塞型函数调用添加超时机制

我们产品中的一个子模块在进行Oracle实时数据库查询时,常常因数据库性能波动或异常而被阻塞在OCI API的调用上,为此我们付出了“惨痛”的代价。说来说去还是我们的程序设计的不够完善,在此类阻塞型函数调用方面缺少微小粒度的超时机制。

调用阻塞多发生在I/O操作(磁盘、网络、低速设备)、第三方API调用等方面。对于文件/网络I/O操作,我们可利用在非阻塞文件描述符上select /poll的超时机制来替代针对阻塞型文件描述符的系统调用;但在第三方API方面,多数时候是无法用select/poll来进行超时的,我们可以选择 另外一种方法:利用setjmp和longjmp的非局部跳转机制来为特定阻塞调用添加超时机制。其原理大致是:利用定时器(alarm、setitimer)设置超时时间,在SIGALRM的handler中利用longjmp跳到阻塞型调用之前,达到超时跳出阻塞型函数调用的效果。同时这种方法通用性更好些。

这个机制实现起来并不难,但有些细节还是要考虑周全,否则很容易出错。我们的产品是需要运行在LinuxSolaris两个平台下的,因此机制的实现还要考虑移植性的问题。下面简要说说在实现这一机制过程中出现的一些问题与解决方法。

一、第一版

考虑到阻塞型函数的原型各不相同,且我们的产品中对阻塞调用有重试次数的要求,因此打算将这个机制包装成一个,大致是这个模样:

#define add_timeout_to_func(func, n, interval, ret, …) \…

其中func是函数名;n是重试的次数;interval是超时的时间,单位是秒;ret是函数成功调用后的返回值,若失败,也是这个宏的返回值。

我们可以像下面这样使用这个宏:

/* example.c */
int
main()
{
    #define MAXLINE 1024
    char line[MAXLINE];

    int ret = 0;
    int try_times = 3;
    int interval = 1000;
    add_timeout_to_func(read, try_times, interval, ret, STDIN_FILENO, line, MAXLINE);
    if (ret == E_CALL_TIMEOUT) {
        printf("invoke read timeouts for 3 times\n");
        return -1;
    } else if (ret == 0) {
        printf("invoke read ok\n");
        return 0;
    } else {
        printf("add_timeout_to_func error = %d\n", ret);
    }
}

add_timeout_to_func中为阻塞型函数添加的超时机制是利用setjmp/longjmp与信号的处理函数合作完成的。

/* timeout_wrapper.h */
 

#include <setjmp.h>
#include <stdarg.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <errno.h>

extern volatile int invoke_count;
extern jmp_buf invoke_env;

void timeout_signal_handler(int sig);
typedef void (*sighandler_t)(int);
#define E_CALL_TIMEOUT (-9)

#define add_timeout_to_func(func, n, interval, ret, ...) \
    { \
        invoke_count = 0; \
        sighandler_t h = signal(SIGALRM, timeout_signal_handler); \
        if (h == SIG_ERR) { \
            ret = errno; \
            goto end; \
        }  \
\
        if (sigjmp(invoke_env) != 0) { \
            if (invoke_count >= n) { \
                ret = E_CALL_TIMEOUT; \
                goto err; \
            } \
        } \
\
        alarm(interval);\
        ret = func(__VA_ARGS__);\
        alarm(0); \
err:\
        signal(SIGALRM, h);\
end:\
        ;\
    }

/* timeout_wrapper.c */
#include "timeout_wrapper.h"

volatile int invoke_count = 0;
jmp_buf invoke_env;

void
timeout_signal_handler(int sig)
{
    invoke_count++;
    longjmp(invoke_env, 1);
}

编译运行这个程序,分别在Solaris、Linux下运行,遗憾的是两个平台下都以失败告终。

先说一下在Linux下的情况。在Linux下,程序居然不响应第二次SIGALRM信号了。通过strace也可以看出,当alarm被第二次调用后, 系统便阻塞在了read上,没有实现为read增加超时机制的目的。原因何在呢?我在《The Linux Programming Interface》一书中找到了原因。原因大致是这样的,我们按照代码的执行流程来分析:

* add_timeout_to_func宏首先设置了信号的handler,保存了env信息(setjmp),调用alarm设置定时器,然后阻塞在read调用上;
* 1s后,定时器信号SIGALRM产生,中断发生,代码进入信号处理程序,即timeout_signal_handler; Linux上的实现是当进入处理程序时,内核会自动屏蔽对应的信号(SIGALRM)以及此时act.sa_mask字段中的所有信号;在离开 handler后,内核取消这些信号的屏蔽。
* 问题在于我们是通过longjmp调用离开handler的,longjmp对应的invoke_env是否在setjmp时保存了这些被屏蔽的信号呢? 答案是:在Linux上没有。这样longjmp跳到setjmp后也就无法恢复对SIGALRM的屏蔽;当再次产生SIGALRM信号时,程序将无法处 理,也就一直阻塞在read调用上了。

解决方法:将setjmp/longjmp替换为sigsetjmp和siglongjmp,后面这组调用在sigsetjmp时保存了屏蔽信号,这样在 siglongjmp返回时可以恢复到handler之前的信号屏蔽集合,也就是说SIGALRM恢复自由了。在Solaris 下,setjmp/longjmp是可以恢复被屏蔽的信号的。

再说说在Solaris下的情况。在Solaris下,程序在第二次SIGALRM到来之际,居然退出了,终端上显示:“闹钟信号”。这是因为在 Solaris下,通过signal函数设置信号的处理handler仅是一次性的。在应对完一次信号处理后,信号的handler被自动恢复到之前的处 理策略设置,对于SIGALRM来说,也就是程序退出。解决办法:通过多次调用signal设置handler或通过sigaction来长效设置 handler。考虑到移植性和简单性,我们选择了sigaction。在Linux平台下,signal函数底层就是用sigaction实现的,是简洁版的sigaction,因此它的设置不是一次性的,而是长效的。

二、第二版

综上问题的修改,我们有了第二版代码。

/* timeout_wrapper.h */

extern volatile int invoke_count;
extern sigjmp_buf invoke_env;

void timeout_signal_handler(int sig);
typedef void sigfunc(int sig);
sigfunc *my_signal(int signo, sigfunc* func);
#define E_CALL_TIMEOUT (-9)

#define add_timeout_to_func(func, n, interval, ret, …) \
    { \
        invoke_count = 0; \
        sigfunc *sf = my_signal(SIGALRM, timeout_signal_handler); \
        if (sf == SIG_ERR) { \
            ret = errno; \
            goto end; \
        }  \
\
        if (sigsetjmp(invoke_env, SIGALRM) != 0) { \
            if (invoke_count >= n) { \
                ret = E_CALL_TIMEOUT; \
                goto err; \
            } \
        } \
\
        alarm(interval); \
        ret = func(__VA_ARGS__);\
        alarm(0); \
err:\
        my_signal(SIGALRM, sf); \
end:\
        ;\
    }

/* timeout_wrapper.c */

volatile int invoke_count = 0;
sigjmp_buf invoke_env;

void
timeout_signal_handler(int sig)
{
    invoke_count++;
    siglongjmp(invoke_env, 1);
}

sigfunc *
my_signal(int signo, sigfunc *func)
{
    struct sigaction act, oact;

    act.sa_handler = func;
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    if (signo == SIGALRM) {
#ifdef SA_INTERRUPT
        act.sa_flags |= SA_INTERRUPT;
#endif
    } else {
#ifdef SA_RESTART
        act.sa_flags |= SA_RESTART;
#endif
    }
    if (sigaction(signo, &act, &oact) < 0)
        return SIG_ERR;
    return oact.sa_handler;
}

这里从《Unix高级环境编程》中借了一段代码,就是那段my_signal的实现。这样修改后,程序在Linux和Solaris下工作都蛮好的。但目前唯一的缺点就是超时时间粒度太大,alarm仅支持秒级定时器,我们至少要支持毫秒级,接下来我们要换掉alarm。

三、第三版

setitimer与alarm是同出一门,共享一个定时器的。不同的是setitimer可以支持到微秒级的粒度,因此我们就用setitimer替换alarm,第三版仅改动了add_timeout_to_func这个宏:

#define add_timeout_to_func(func, n, interval, ret, …) \
    { \
        invoke_count = 0; \
        sigfunc *sf = my_signal(SIGALRM, timeout_signal_handler); \
        if (sf == SIG_ERR) { \
            ret = errno; \
            goto end; \
        }  \
\
        if (sigsetjmp(invoke_env, SIGALRM) != 0) { \
            if (invoke_count >= n) { \
                ret = E_CALL_TIMEOUT; \
                goto err; \
            } \
        } \
\
        struct itimerval tick;  \
        struct itimerval oldtick;  \
        tick.it_value.tv_sec = interval/1000; \
        tick.it_value.tv_usec = (interval%1000) * 1000; \
        tick.it_interval.tv_sec = interval/1000; \
        tick.it_interval.tv_usec = (interval%1000) * 1000; \
\
        if (setitimer(ITIMER_REAL, &tick, &oldtick) < 0) { \
            ret = errno; \
            goto err; \
        } \
\
        ret = func(__VA_ARGS__);\
        setitimer(ITIMER_REAL, &oldtick, NULL); \
err:\
        my_signal(SIGALRM, sf); \
end:\
        ;\
    }

至此,一个为阻塞型函数调用添加的超时机制的雏形基本实现完毕了,但要放在产品代码里还需要更细致的打磨。至少目前只是在单进程单线程中跑过,而且要求每个函数中只能调用add_timeout_to_func一次,否则就会有编译错误。

以上完整代码我都放到github上的experiments repository中了,有兴趣的朋友可以下载细看。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats