标签 分布式系统 下的文章

Memcached CAS应用一例

近期收到客户一个需求,我将该需求转述为下面这个等价的问题。

【问题】

    * 有一个产品包装系统S,为某种产品P提供产品包装服务;

    * 系统S由若干个处理节点组成,每个节点都可以单独处理组件;

    * 产品P的一个可出厂的成品由包装盒+N个产品组件组成,包装盒与产品组件上都贴有一个标签,该标签上包含该成品的唯一编号ID(一定时间范围内有效)、每个组件自己的序号(unit-num)以及成品的组件总个数(unit-total)。每个成品只有一个包装盒,该包装盒的组件序号为0。其中unit-num <= unit_total == N <= 32;

    * 某个成品的诸多组件是乱序到达S并由S送到产品包装工位的;当系统S第一次接收到一个成品的某个组件时,S会将一个包装盒贴上该组件对应的成品ID,并将其放在传送带上,传送给对应的组装工位;当系统S接收到同一成品的其他组件时,不再重新发放包装盒了;

    * 系统S具有剔除冗余组件的功能,如果某个成品的某个组件(序号为n)已经被S接收并送到指定包装位,后续若再出现同一成品的相同序号组件(可能是因为标签贴错导致),S将会将该冗余组件剔除出包装线;

    * 当某个成品的最后一个组件被S处理后,该成品的ID即告无效了,可以被后续成品重复使用了。

【解决思路】

这个问题中有几个关键功能点:

    * 每个成品只分配一个包装盒;

    * 支持剔重;

    * 当最后一个组件被处理后,成品ID被从系统中删除,可被后续成品重复使用。

这是一个典型的多个节点并发操作的一致性问题,我们初步考虑基于开源的MemcachedCAS服务去解决该问题,解决思路如下:

    a) S系统中的某个节点收到某成品的某个组件(unit_num = n)后,以ID为Key尝试获取成品的Value(以及item_cas值);如果索引尚未在系统建立,那么创建索引,以ID为Key,Value为一整型字符串,初值为1<<(n-1);并分配包装盒;

    b) 如果以成品ID为Key的索引已经建立,系统节点将组件的(1<<n)与Value进行“与操作”以判断该组件是否为重复组件,如果为1,则为重复组件;否则以(Value +  1 << (n-1))的值以及获得的item_cas发起cas操作;

    c) 如果cas操作成功,则数一下((Value +  1 << (n-1)) 中置位(=1)的bit个数,如果个数==unit-total,则删除索引;否则继续处理下一个组件;
        如果cas操作失败,则回到步骤a)。

【Demo代码】

/* pack_sys.c */

… …
#include <libmemcached/memcached.h>

static const char *product_id = "nexus5";
static const int component_in_total = 5;
static const int component_order[] = {2, 3, 1, 2, 5, 4};

//code from <Algorithms.for.Programmers.Ideas.and.Source.Code>
static inline unsigned long long
bit_count(unsigned long long x)
{
    x = (0x5555555555555555UL & x) + (0x5555555555555555UL & (x >> 1));
    x = (0x3333333333333333UL & x) + (0x3333333333333333UL & (x >> 2));
    x = (0x0f0f0f0f0f0f0f0fUL & x) + (0x0f0f0f0f0f0f0f0fUL & (x >> 4));
    x = (0x00ff00ff00ff00ffUL & x) + (0x00ff00ff00ff00ffUL & (x >> 8));
    x = (0x0000ffff0000ffffUL & x) + (0x0000ffff0000ffffUL & (x >> 16));
    x = (0x00000000ffffffffUL & x) + (0x00000000ffffffffUL & (x >> 32));
    return x;
}

int
main(int argc, char *argv[])
{
    memcached_st *memc;
    memcached_return_t rc = MEMCACHED_SUCCESS;
    memcached_server_st *server = NULL;

    memc = memcached_create(NULL);
    if (NULL == memc) {
        printf("memcached_create error\n");
        return -1;
    }

    … …

    rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, 1);
    if (rc != MEMCACHED_SUCCESS) {
        printf("memcached_behavior_set support cas error: %s\n",
                memcached_strerror(memc, rc));
        return -1;
    }

    /* pack the component one by one */
    int ret = 0;
    int i = 0;
    for (i = 0; i < sizeof(component_order)/sizeof(component_order[0]); i++) {
        ret = pack_component(memc, component_order[i]);
        if (ret == 0) {
            printf("pack component [%d] ok\n”, component_order[i]);
        } else if (ret == 1) {
            printf("pack component [%d] exists\n”, component_order[i]);
        } else {
            printf("other error occurs\n");
            return -1;
        }
        getchar();
    }

    return 0;
}

int
pack_component(memcached_st *memc, int i)
{
    memcached_return_t rc = MEMCACHED_SUCCESS;

    uint32_t mask = 1 << (i – 1);
    uint32_t value_added = 1 << (i – 1);
    char value_added_str[11] = {0};
    uint32_t value = 0;
    char *pvalue = NULL;
    size_t value_len = 0;
    uint32_t flags = 0;

    while(1) {
        pvalue = memcached_get(memc, product_id, strlen(product_id),
                               &value_len, &flags, &rc);
        if (!pvalue) {
            if (rc == MEMCACHED_NOTFOUND) {
                printf("componet [%d] – memcached_get not found product key: [%s]\n",
                       i, product_id);
                memset(value_added_str, 0, sizeof(value_added_str));
                sprintf(value_added_str, "%u", value_added);
                rc = memcached_add(memc, product_id, strlen(product_id), value_added_str,
                                   strlen(value_added_str), 1000, 0);
                if (rc == MEMCACHED_DATA_EXISTS) {
                    printf("componet [%d] – memcached_add key[%s] exist\n", i, product_id);
                    pvalue = memcached_get(memc, product_id, strlen(product_id),
                                           &value_len, &flags, &rc);
                    if (!pvalue) return -1;
                } else if (rc != MEMCACHED_SUCCESS) {
                    printf("componet [%d] – memcached_add error: %s, [%d]\n",
                            i, memcached_strerror(memc, rc), rc);
                    return -1;
                } else {
                    printf("componet [%d] – memcached_add key[%s] successfully,"
                           " its value = %u, cas = %llu\n",
                            i,product_id,
                            value_added, (memc->result).item_cas);
                    return 0;
                }
            } else {
                printf("componet [%d] – memcached_get error: %s, %d\n",
                       i, memcached_strerror(memc, rc), rc);
                return -1;
            }
        }

        value = atoi(pvalue);
        printf("componet [%d] – memcached_get value = %u, cas = %llu\n",
                i, value, (memc->result).item_cas);

        if (value & mask) {
            free(pvalue);
            return 1;
        } else {
            uint64_t cas_value = 0;
            cas_value = (memc->result).item_cas;
            memset(value_added_str, 0, sizeof(value_added_str));
            sprintf(value_added_str, "%d", value_added + value);

            rc = memcached_cas(memc, product_id, strlen(product_id),
                               value_added_str, strlen(value_added_str),
                               1000, 0, cas_value);
            if (rc != MEMCACHED_SUCCESS) {
                printf("componet [%d] -  memcached_cas error = %d,  %s\n",
                        i, rc, memcached_strerror(memc, rc));
                free(pvalue);
            } else {
                printf("componet [%d] -  memcached_cas ok\n", i);
                free(pvalue);
                if (bit_count(value_added + value) == component_in_total) {
                    rc = memcached_delete(memc, product_id, strlen(product_id), 0);
                    if (rc != MEMCACHED_SUCCESS) {
                        printf("memcached_delete error: %s\n",
                                memcached_strerror(memc, rc));
                        return -1;
                    } else {
                        printf("memcached_delete key: %s ok\n", product_id);
                    }
                }
                return 0;

            }
        }
        getchar();
    }

    return 0;
}

代码看起来较多,主要是要考虑各种异常情况。

我们可以通过先后启动两个pack_sys来验证程序逻辑的正确性:

窗口1:
$> pack_sys
componet [2] – memcached_get not found product key: [nexus5]
componet [2] – memcached_add key[nexus5] successfully, its value = 2, cas = 0
pack component [2] ok

窗口2:
$> pack_sys
componet [2] – memcached_get value = 2, cas = 54
pack component [2] exists

若两个窗口继续交替执行,一种可能的结果如下:

窗口1:

$> pack_sys
componet [2] – memcached_get not found product key: [nexus5]
componet [2] – memcached_add key[nexus5] successfully, its value = 2, cas = 0
pack component [2] ok

componet [3] – memcached_get value = 2, cas = 54
componet [3] -  memcached_cas ok
pack component [3] ok

componet [1] – memcached_get value = 6, cas = 55
componet [1] -  memcached_cas ok
pack component [1] ok

componet [2] – memcached_get value = 23, cas = 57
pack component [2] exists

componet [5] – memcached_get not found product key: [nexus5]
componet [5] – memcached_add key[nexus5] successfully, its value = 16, cas = 0
pack component [5] ok

componet [4] – memcached_get value = 16, cas = 59
componet [4] -  memcached_cas ok
pack component [4] ok

窗口2:

$> pack_sys
componet [2] – memcached_get value = 2, cas = 54
pack component [2] exists

componet [3] – memcached_get value = 7, cas = 56
pack component [3] exists

componet [1] – memcached_get value = 7, cas = 56
pack component [1] exists

componet [2] – memcached_get value = 7, cas = 56
pack component [2] exists

componet [5] – memcached_get value = 7, cas = 56
componet [5] -  memcached_cas ok
pack component [5] ok

componet [4] – memcached_get value = 23, cas = 57
componet [4] -  memcached_cas ok
memcached_delete key: nexus5 ok
pack component [4] ok

全部Demo代码已经上传到github上了,感兴趣可以去下载。

【其它】

* 我用的是libmemcached 1.0.17版本,memcached 1.4.15版本。

* libmemcached启用cas后,只能在ascii模式下工作,在binary下会得到如下错误,应该是libmemcached的bug;

     memcached_cas error,  SERVER END, 21 

* libmemcached的官方文档中某些内容似乎已经落伍了,与代码的实际行为已经不一致了,参考manual的时候要小心,最好能对着源码看。

* 关于问题调试,可以考虑通过-vv命令行选项打开memcached的详细日志,这样你就可以看到memcached的一举一动,特别是涉及到binary protocol时,这样调试更有效率。

利用ZooKeeper服务实现分布式系统的配置数据同步

很多时候,一旦习惯了某些事情,也就习惯了它们的恶劣,习惯了它们的丑陋,习惯了它们“赋予”你的各种痛苦。
                                                                                                                                                      – Tony Bai

一、痼疾难解

曾几何时,在那个还没有集群化,没有分布式的时代,它还是一个不错的方案,至少在线上没有暴露出太多问题,它也不在我们关注的重点范围之内。但随 着集群化、分布式的新版本的到来,那一大坨遗留的代码就变得格外让人不顺眼,同时问题也随之在线上暴露开来了。

这里的“它”指的就是我们目前的业务配置数据同步方案。简单描述这个方案如下:

* 方案涉及两个角色 – 数据库(DB)与应用节点(app_node);
* 所有的业务配置数据均统一存储在DB中;
* 应用节点在启动后从DB中读取最新业务配置数据;
* 应用节点运行过程中,如果DB中的业务配置数据发生变更(增/删/改),DB中的触发器(trigger)将会执行。在触发器的脚本中,触发器将会【串 行】地与每个应用节点建立TCP链接,并将业务配置表的变更信息发给各个应用节点。 应用节点会接收并【解析】触发器发过来变更数据包,并同步到自己的本地内存中。这样就达到了运行时更新配置的目的。

上面我用【】标记了两个关键词:“串行”和“解析”。这两个词隐含有这个方案的两个主要问题。

“串行” – 意味着每一次DB的业务配置数据变更,trigger脚本都要逐个与应用节点建立链接并收发数据。当应用节点逐渐增多时,每一次业务数据同步都会相当地耗 时。尤其是当某个应用节点所在主机出现问题时,到该节点链接建立的过程会阻塞,导致整个业务配置数据同步的时间达到无法忍受的地步。

“解析” – 我们自定义了trigger与应用节点之间的协议包。协议包中包含了每次变更的详细信息,比如在某个表添加一条记录,trigger会将这个记录的每个字 段信息排成一行打包发给应用节点。应用节点收到这个包后,会根据已有的表字段信息对该包进行解析。看得出这是一个很强的耦合:表字段一旦修 改,trigger脚本要修改,应用节点的解析函数要修改,还要考虑协议包中表字段的排序。如果应用节点解析时与trigger脚本打包时的字段 顺序不同的话,那就可能出现严重错误,而且这种错误有时难于校验并难于发现。

二、曾经的努力

针对这个方案的不足,我们曾经也做过改进,但主要针对的是解决“串行”这个问题上。

第一次改进:同步的发起能否并行做?trigger脚本能否并行发起对各个应用节点的链接建立请求?

Java组同事对trigger脚本做了改进。让trigger脚本调用function,而function中又调用了写好的Java方 法,Java代码由DB加载到环境中。在Java方法中创建多个同步线程,并发与各应用节点建立链接并发送数据。这个方法的确可以变“串行”为 “并行”,但不知为何生产环境中实际运行时偶尔会出现异常,该异常发生在DB中,影响很大。有时还会导致DB的一些异常现象。至今原因尚未明确, 我们无奈退回到以前的方案。

第二次改进:从Push模式到Pull模式

在之前部门新规划的一个产品中,开发人员对数据同步的机制做了重新的设计,将原来的Push模式改为了Pull模式。大致方案是:
   
    * 业务数据变更时,trigger直接将变更内容(以老方案中那个协议包的打包格式)写到一个“变更日志表”中,每条记录有一个唯一的序号,序号递增。
    * 应用节点启动后,从DB加载最新配置信息,查询“变更日志表”,得到该表内最新的一条记录的序号n。
    * 应用节点以“轮询”的方式定期查询“变更日志表”,并读取和解析那些序号比序号n更新的记录;更新完后,将继续保存最新的一条记录序号。
    * 数据库中有job定期对“变更日志表”中的记录进行过期删除处理。

个人感觉第二个方案应该是理想方案的一个雏形,虽然目前它的同步更新可能不是那么及时,与DB交互过多(方案细节中每个应用节点在处理完一条记录 后还要更新记录的状态)。该方案设计者也完全也可以放弃那个导致耦合的协议包设计,但他最终还是选择保留了原有协议包解析函数。目前该方案在产品 环境下运行还算良好,并未暴露出什么问题。这算是一次有效的改进,也为本文中要提到的方案提供了一些思路启示。

三、与时俱进

ZooKeeper生来就具备解决分布式系统的配置分发和同步的能力。利用ZooKeeper服务实现分布式系统的统一配置中心已经不是那么新鲜 的话题了。最简单的模型莫过于将配置数据存储在ZooKeeper上的路径节点上,然后应用节点在这些配置节点上添加watch。当配置数据变更 时,每个应用节点都可以及时得到通知,同步到最新数据。这种模型对于一些量少简单的系统配置来说较为合适。对于我们每个表动辄上万条配置的情形似 乎不那么适合,想象一下每个应用节点要添加上万个watch,这对ZooKeeper而言也是压力山大啊。因此用ZooKeeper提供的诸多服 务如何来优化我们上面提到的两个主要问题呢?这里提出一种方案仅供参考。

方案示意图:

DB  —-> Config Center Services(css_agent + ZooKeeper)  —> App Node

在新方案中,我们要:
    保留 – 保留trigger脚本,作为业务数据变更的唯一的触发起点;
    摒弃 – 摒弃那个复杂的带来耦合的协议格式;
    借鉴 – 借鉴“Push -> Pull”的数据获取方式。

新方案中除了DB、应用节点(app_node)外,新增加了一个角色Config Center Services(缩写为ccs),ccs由ZooKeeper + ccs_agent的集群组成。简单起见,每个ZooKeeper节点上部署一个ccs_agent。这些角色之间的数据流和指令流关系,即该方案的原理 如下:

    * 初始化
        – ZooKeeper集群启动;
        – ccs_agent启动,利用ZooKeeper提供的leader election服务,选出ccs_agent leader。ccs_agent leader启动后负责在ZooKeeper中建立业务配置表node,比如:表employee_info_tab对应的node路径为“/ccs /foo_app/employee_info_tab”;
        – ccs_agent启动后会监听一个端口,用来接受DB trigger向其发起的数据链接;
        – 应用节点启动,监听ZooKeeper上所有(数量有限的)业务配置表node的child event;
   
    * 数据变更
        – DB中某业务表比如employee_info_tab增加了一条id为"1234567"的记录;
        – 触发器启动,向ccs_agent cluster中任意一个可用的节点建立链接,并将数据包“^employee_info_tab|ADD|1234567$"发送给 ccs_agent;
        – ccs_agent收取并解析trigger发来的数据包,在对应的/ccs/foo_app/employee_info_tab下建立ZOO_SEQUENCE类 型节点“item-000000000”,该节点的值为“ADD 1234567";
        – ZooKeeper将/ccs/foo_app/employee_info_tab节点的child事件发给所有watch该节点事件的应用节点;
        – 应用节点“取出”/ccs/foo_app/employee_info_tab节点下的children节点"item-000000000",并读取 其值,后续到DB的employee_info_tab中将id = 1234567的这条记录select出来,将该条记录更新到本地内存中。应用节点记录下处理过的当下节点id为"item-000000000";
        – DB业务表employee_info_tab又增加了两条记录,id分别为"7777777"和"8888888",经过上面描述的流程,/ccs /foo_app/employee_info_tab节点下会增加"item-000000001"和"item-000000002"两项; 应用节点最终会收到child事件通知。应用节点“取出”/ccs/foo_app/employee_info_tab节点下的所有 children节点并排序。之后,处理那些id号大于"item-000000000"的节点,并将当前节点id记录为“item- 000000002"。依次类推。

    * 过期处理
        – ccs_agent leader负责定期扫描ZooKeeper中/ccs下各个表节点下的子项,对于超出过期时间的item进行删除处理。

    * 应用节点重启
        -  应用节点重启后,会首先从db读取最新信息,并记录启动时间戳;
        -  应用节点重启后,在收到zookeeper的数据变更事件后,会根据当前时间戳与变更表节点下的item创建时间进行比较,并仅处理比启动时间戳新的 item的数据。
   

这个方案主要利用了ZooKeeper提供的leader election服务以及sequence节点的特性,几点好处在于:

    – 串行通知变为并行通知,且通知到达及时;
    – 变更数据的Push模式为Pull模式,降低了或去除了诸多耦合,包括:
            1) 去除trigger脚本与表字段及字段顺序的耦合;
            2) 去除应用节点与表字段顺序的耦合;
            3) 降低应用节点与表字段构成的耦合。
    – 应用节点无需复杂的包解析,简化后期维护。

当然为了该方案新增若干网元会给产品部署和维护带来一些复杂性,这算是不足之处吧。

四、Demo

这里有一个600多行代码的Demo,模拟新方案中几个角色:
    DB – trigger_sim.py
    应用节点 – app.c
    ccs_agent – ccs_agent.c

模拟的步骤大致如下(单机版):

a) 启动ZooKeeper
    $> zkServer.sh start
    JMX enabled by default
    Using config: /home1/tonybai/.bin/zookeeper-3.4.5/bin/../conf/zoo.cfg
    Starting zookeeper … STARTED

b) 启动ccs_agent
    $> ccs_agent
    This is [ccs-member0000000037], i am a leader
    /ccs node exists
    /ccs/employee_info_tab node exists
    /ccs/boss_info_tab node exists
    trigger listen thread start up!
    item expire thread start up!

c) 启动app

d) 使用trigger_sim.py模拟DB触发trigger
        $> trigger_sim.py employee_info_tab ADD 1234567

可以看到ccs_agent输出结果如下:
    table[employee_info_tab], oper_type[ADD], id[1234567]

app的输出如下:
    child event happened: type[4]
    item-0000000015
    employee_info_tab: execute [ADD 1234567]

大约30s后,ccs_agent会输出如下:
    [expire]: employee_info_tab: expire [item-0000000015]

模拟步骤在README里有写。这里仅是Demo代码,存在硬编码以及异常处理考虑不全面的情况,不要拍砖哦。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats