标签 API 下的文章

利用ZooKeeper服务实现分布式系统的Leader选举

每次与Java组的同事们坐下来谈技术、谈理想、谈人生时,Java组的同事总会向我们投来羡慕的眼光:卧槽!又是自己开发的工具,太NB了。这时C程序 员们的脸上就会洋溢出自豪的笑容,然后内心骂道:谁让我们没有现成的呢。另一个空间里的某些“无C不欢”们或者某些“C Guru”们会骂道:靠,有了也不用,自己写!

有时候,C程序员真的有一种下意识:不情愿使用其他语言开发的工具、框架或服务,且比其他程序员更爱“重新发明轮子”(有利有弊)。也许这是某种 骨子里的自负在搞怪;另外一个极端:今天和我聊天的一个经验丰富的C程序员还在忧虑:如果离职是否有公司会要他:(。

其实这个时代的C程序员一直活得挺纠结^_^。

这个世界,软硬件发展日新月异,越来越多的后端程序用Java等其他语言实现。Java高级选手在这个世界上也甚是吃香,这个你看看各大招聘网站 就知道了。再听听坊间“BAT”三巨头给出的高高在上的offer价格,也可以看出Java程序员是多么的有“钱途”和受欢迎了。当然拿好offer的前提是你的Java底子不薄。

其实无论用什么编程语言,成为牛人后,钱途也都是杠杠的。

没有什么好的开场白,于是有了上面一些“胡言乱语”。我们言归正传。

本文是一篇初级技术博文。讲的是如何使用ZooKeeper C API通过ZooKeeper的服务实现分布式系统的Leader选举。当然这一试验是为了尝试解决我们自己的分布式系统在集中配置数据分发这一环节上的 一个“固疾”。还好我还不那么纠结,也没有重新实现ZooKeeper的冲动,于是我就用了ZooKeeper这一Java实现的成熟的分布式 系统的服务框架。

* 搭建ZooKeeper服务环境

    – 下载官方stable release版本 – ZooKeeper3.4.5。解压后,将$ZooKeeper_INSTALL_PATH/bin加入到PATH变量中(其中ZooKeeper_INSTALL_PATH为解压后ZooKeeper-3.4.5目录的绝对路径)。

    – 试验环境下,最简单的ZooKeeper用法就是使用单机版。
      进入到$ZooKeeper_INSTALL_PATH/conf下,将zoo_sample.cfg改名为zoo.cfg,即可作为单机版ZooKeeper的配置文件。当然你也可以像我一样随意修改修改:

      # The number of milliseconds of each tick
   tickTime=2000
   # The number of ticks that the initial
   # synchronization phase can take
   initLimit=5
   # The number of ticks that can pass between
   # sending a request and getting an acknowledgement
   syncLimit=2

   dataDir=/home/tonybai/proj/myZooKeeper
   # the port at which the clients will connect
   clientPort=2181

       
      如果你要体验多机版ZooKeeper服务,那你还要继续动动手脚,以双机版为例,假设有两个ZooKeeper节点(10.0.0.13和10.0.0.14):

      10.0.0.13上的ZooKeeper节点1的配置文件如下:

     # The number of milliseconds of each tick
   tickTime=2000
   # The number of ticks that the initial
   # synchronization phase can take
   initLimit=5
   # The number of ticks that can pass between
   # sending a request and getting an acknowledgement
   syncLimit=2

   dataDir=/home/tonybai/proj/myZooKeeper
   # the port at which the clients will connect
   clientPort=2181

   server.1=10.0.0.13:2888:3888 
   server.2=10.0.0.14:2888:3888

     10.0.0.14上的ZooKeeper节点2的配置文件如下:

     # The number of milliseconds of each tick
   tickTime=2000
   # The number of ticks that the initial
   # synchronization phase can take
   initLimit=5
   # The number of ticks that can pass between
   # sending a request and getting an acknowledgement
   syncLimit=2

   dataDir=/home/tonybai/proj/myZooKeeper
   # the port at which the clients will connect
   clientPort=2181

   server.1=10.0.0.13:2888:3888
   server.2=10.0.0.14:2888:3888

      别忘了在每个节点的dataDir下分别创建一个myid文件:
      在10.0.0.13节点1上执行:
      
     $> echo 1 > myid

      在10.0.0.14节点2上执行:
     
   $> echo 2 > myid

      启动ZooKeeper执行:
      $> zkServer.sh start

      模拟一个客户端连到ZooKeeper服务上:
      $> zkCli.sh

      成功链接后,你将进入一个命令行交互界面:
       [zk: 10.0.0.13:2181(CONNECTED) 1] help
    ZooKeeper -server host:port cmd args
    connect host:port
    get path [watch]
    ls path [watch]
    set path data [version]
    rmr path
    delquota [-n|-b] path 

        … …

* 选主原理

   ZooKeeper在选主过程中提供的服务就好比一栋名为"/election"小屋,小屋只有一个门,各节点只能通过这个门逐个进入。每个节点进入后, 都会被分配唯一编号(member-n),编号n自小到大递增,节点编号最小的自封为Leader,其他节点只能做跟班的(follower) – 这年头还是小的吃香:原配干不过小三儿,小三儿干不过小四儿,不是么^_^!)。
   每当一个节点离开,ZooKeeper都会通知屋内的所有节点,屋内节点收到通知后再次判断一下自己是否是屋内剩余节点中编号最小的节点,如果是,则自封为Leader,否则为Follower。

   再用稍正式的语言重述一遍:

   各个子节点同时在某个ZooKeeper数据路径/election下建立"ZOO_SEQUENCE|ZOO_EPHEMERAL"节点 – member,且各个节点监视(Watch) /election路径的子路径的变更事件。ZooKeeper的sequence节点特性保证节点创建时会被从小到大加上编号。同时节点的 ephemeral特性保证一旦子节点宕机或异常停掉,其对应的member节点会被ZooKeeper自动删除,而其他节点会收到该变更通知,重新判定 自己是leader还是follower以及谁才是真正的leader。

* 示例代码

关于ZooKeeper的C API的使用资料甚少,但这里就偏偏要用C API举例。

C API的安装方法:进入$ZOOKEEPER_INSTALL_PATH/src/c下面,configure->make->make install即可。

ZooKeeper的C API分为同步与异步两种模式,这里简单起见用的都是同步机制。代码不多,索性全贴出来。在这里能checkout到全部代码。

/* election.c */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "zookeeper.h"

static int
is_leader(zhandle_t* zkhandle, char *myid);

static void
get_node_name(const char *buf, char *node);

struct watch_func_para_t {
    zhandle_t *zkhandle;
    char node[64];
};

void
election_children_watcher(zhandle_t* zh, int type, int state,
                      const char* path, void* watcherCtx)
{
    int ret = 0;

    struct watch_func_para_t* para= (struct watch_func_para_t*)watcherCtx;

    struct String_vector strings;
    struct Stat stat;

    /* 重新监听 */
    ret = zoo_wget_children2(para->zkhandle, "/election", election_children_watcher,
                             watcherCtx, &strings, &stat);
    if (ret) {
        fprintf(stderr, "child: zoo_wget_children2 error [%d]\n", ret);
        exit(EXIT_FAILURE);
    }

    /* 判断主从 */
    if (is_leader(para->zkhandle, para->node))
        printf("This is [%s], i am a leader\n", para->node);
    else
        printf("This is [%s], i am a follower\n", para->node);

    return;
}

void def_election_watcher(zhandle_t* zh, int type, int state,
        const char* path, void* watcherCtx)
{
    printf("Something happened.\n");
    printf("type: %d\n", type);
    printf("state: %d\n", state);
    printf("path: %s\n", path);
    printf("watcherCtx: %s\n", (char *)watcherCtx);
}

int
main(int argc, const char *argv[])
{

    const char* host = "10.0.0.13:2181";
    zhandle_t* zkhandle;
    int timeout = 5000;
    char buf[512] = {0};
    char node[512] = {0};

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zkhandle = zookeeper_init(host, def_election_watcher, timeout,
                              0, "Zookeeper examples: election", 0);
    if (zkhandle == NULL) {
        fprintf(stderr, "Connecting to zookeeper servers error…\n");
        exit(EXIT_FAILURE);
    }

    /* 在/election下创建member节点 */
    int ret = zoo_create(zkhandle,
                        "/election/member",
                        "hello",
                        5,
                        &ZOO_OPEN_ACL_UNSAFE,  /* a completely open ACL */
                        ZOO_SEQUENCE|ZOO_EPHEMERAL,
                        buf,
                        sizeof(buf)-1);
    if (ret) {
        fprintf(stderr, "zoo_create error [%d]\n", ret);
        exit(EXIT_FAILURE);
    }

    get_node_name(buf, node);
    /* 判断当前是否是Leader节点 */
    if (is_leader(zkhandle, node)) {
        printf("This is [%s], i am a leader\n", node);
    } else {
        printf("This is [%s], i am a follower\n", node);
    }

    struct Stat stat;
    struct String_vector strings;
    struct watch_func_para_t para;
    memset(&para, 0, sizeof(para));
    para.zkhandle = zkhandle;
    strcpy(para.node, node);

    /* 监视/election的所有子节点事件 */
    ret = zoo_wget_children2(zkhandle, "/election", election_children_watcher, &para, &strings, &stat);
    if (ret) {
        fprintf(stderr, "zoo_wget_children2 error [%d]\n", ret);
        exit(EXIT_FAILURE);
    }

    /* just wait for experiments*/
    sleep(10000);

    zookeeper_close(zkhandle);
}

static int
is_leader( zhandle_t* zkhandle, char *myid)
{
    int ret = 0;
    int flag = 1;

    struct String_vector strings;
    ret = zoo_get_children(zkhandle, "/election", 0, &strings);
    if (ret) {
        fprintf(stderr, "Error %d for %s\n", ret, "get_children");
        exit(EXIT_FAILURE);
    }

    /* 计数 */
    for (int i = 0;  i < strings.count; i++) {
        if (strcmp(myid, strings.data[i]) > 0) {
            flag = 0;
            break;
        }
    }

    return flag;
}

static void
get_node_name(const char *buf, char *node)
{
    const char *p = buf;
    int i;
    for (i = strlen(buf) – 1; i >= 0; i–) {
        if (*(p + i) == '/') {
            break;
        }
    }

    strcpy(node, p + i + 1);
    return;
}

编译这个代码:
$> gcc -g -std=gnu99 -o election election.c -DTHREADED -I/usr/local/include/zookeeper -lzookeeper_mt -lpthread

验证时,我们在不同窗口启动三次election程序:

窗口1, election启动:

$> election
Something happened.
type: -1
state: 3
path:
watcherCtx: Zookeeper examples: election
This is [member0000000001], i am a leader

窗口2,election启动:

$> election
Something happened.
type: -1
state: 3
path:
watcherCtx: Zookeeper examples: election
This is [member0000000002], i am a follower

此时窗口1中的election也会收到/election的字节点增加事件,并给出响应:

This is [member0000000001], i am a leader

同理当窗口3中的election启动时,窗口1和2中的election都能收到变动通知,并给予响应。

我们现在停掉窗口1中的election,大约5s后,我们在窗口2中看到:

This is [member0000000002], i am a leader

在窗口3中看到:

This is [member0000000003], i am a follower

可以看出窗口2和3中的election程序又做了一次自我选举。结果窗口2中的election由于节点编号最小而被选为Leader。

一种基于内存映射文件的系统运行数据提取方法

这是我无意中想到的一个方法,估计这个方法已经不是什么新鲜的东西了,很可能在类似的问题场景中早已经被使用了。不过这里还是要说说我的思维过程。

近期在学习一些Linux性能查看和分析方面的工具,比如top、iostat、vmstat以及sar等。在学习过程中我发现这些工具有个共同的特点,那就是她们采集的Linux运行数据都是从/proc下的文件中实时获取并计算而得出的。众所周知,/proc是Linux内核维护的一个虚拟文件系统,他允许用户在Linux运行时查看内核运行数据(用户可以像查看普通文件一样查看/proc下的目录和文件),甚至是运行时实时改变内核设置。Linux实现/proc的细节不是这里要关注的,吸引我的是Linux的这种提取运行数据的设计。这个设计将Linux运行数据的产生实现细节与第三方性能采集工具间的耦合最大化地解开,这样一来/proc就像是一种Linux的基础服务,为用户提供一种实时的运行数据信息。而用户侧的运行数据查看工具也可以根据用户的需求自由定制,因此有了top、iostat、vmstat、iotop、sar等关注点不同的工具。

好了,说完/proc后,再来说说我们的产品。用户长期以来一直在抱怨我们的产品监控和维护方面手段太过单一,产品就像是一个黑盒,没有提供一种自我运行观察的能力,让客户看不清阿看不清,用户无法实时获取当前某个节点上的业务运行状况,无法采集到这些业务运行的实时基础数据,这的确是我们长期以来的短板(以前这块受重视度也的确不足)。虽然这两年我们在改善运维手段方面的投入已经加大,并收到一些显著的效果,但方案都是集中的,且相对重量级的,不那么敏捷灵活 – 在单节点上依旧无法简单地获取该节点的运行数据。

结合/proc的设计以及我们所遇到的问题,我有了一个大胆的想法:是否可以给我们的业务系统也加上一种类似Linux /proc这样的可提供基础运行数据的服务能力呢?于是就有了下面的解决方法。

Linux /proc下面的数据文件是Linux Kernel维护的,并允许用户层的进程实时查看和配置数据。而对于我们的产品而言,提供基础数据的产品实例与提取基础数据的第三方程序是两个独立的用户level的进程,显然我们需要找到一种让这两个进程实时通信、低耦合的且性能代价极低的方法。

我首先想到的是文件,这似乎和/proc的方式一样。你查看一下sysstat源码会发现,像iostat、sar等工具都是用fopen以"r"方式打开/proc/下的各种stat文件,匹配和读取指标项后再统计的。但在User层,两个无亲缘关系进程共同操作一个文件 – 一个读,一个写,the file position indicator是很难控制的,可能涉及文件锁(flock/fcntl),还要考虑使用的库函数是否是带缓冲的(fread/fgets都是带缓冲 的,不能用),写端需要及时fsync/fflush。总而言之,这么做是甚为自讨没趣的,会给两个程序的实现都带来很大的复杂性以及各种“坑”的。

那用named fifo如何呢?一但用named fifo,这两个进程就会产生启动依赖,如果一端没有启动,另一端会一直阻塞;而且通过fifo传递多种业务数据还可能存在打包和解包的过程,实现起来复杂的很。这显然是耦合十分严重的糟糕方案。

两个进程既要有共同的识别目标,就像/proc/cpuinfo这样的已知路径,一个进程还要能及时地得到另外一个进程运行时的数据,我们不妨尝试一下内存文件映射这个方案:运行数据提供的进程映射一个已知目标文件,比如perf/xxstat,然后在映射后的地址上创建和更新指标数据。比如我们建立一个整型数组,数组的每个元素都代表一种运行指标;而运行数据提取进程同样映射该文件,并在映射后获得数组中的各个元素值。下面是一个示例程序:

/* producer */
int
main()
{
    FILE *fp = NULL;

    errno = 0;
    fp = fopen(STAT_FILE, "w+");
    if (fp == NULL) {
        printf("can not create stat file , err = %d\n", errno);
        return -1;
    }

    errno = 0;
    long size = sysconf(_SC_PAGESIZE);
    if (ftruncate(fileno(fp), size) != 0) {
        printf("can not set stat file size, err = %d\n", errno);
        fclose(fp);
        return -1;
    }

    errno = 0;
    char *p = NULL;
    p = mmap(NULL, size, PROT_WRITE|PROT_READ, MAP_SHARED, fileno(fp), 0);
    if (p == MAP_FAILED) {
        printf("can not mmap file, error = %d\n", errno);
        fclose(fp);
        return -1;
    }

    errno = 0;
    if (fclose(fp) != 0) {
        printf("can not close file, error = %d\n", errno);
        return -1;
    }

    /* round up to 8 */
    while((int)p % 8 != 0) {
        p++;
    }

    long long *q = (long long*)p;
    q[0] = 1;
    q[1] = 1000;
    q[2] = 10000;
    q[3] = 100000;

    while(1) {
        q[0] += 1;
        q[1] += 10;
        q[2] += 100;
        q[3] += 1000;
        usleep(200);
    }

    return 0;
}

该producer程序首先尝试以"w+"方式打开xxstat文件,并设置文件的大小,然后调用mmap做内存文件映射,理论上来说mmap成功时返回的地址一定是按该平台下最严格内存系数对齐的地址,但这里为了安全起见,又做了一次内存地址的圆整。producer以映射的地址为首地址,建立了一个包含四个元素的、每个元素大小为8字节的整型数组,其中每个元素模拟一个运行指标。在while(1)循环中,producer模拟更新这四个指标数据。

下面是提取producer运行数据的例子程序,其映射过程与producer类似,这里就不贴出完整代码了,完整代码可在这里下载。

/* reader.c */

int
main()
{
    FILE *fp = NULL;
    … …

    char *p = NULL;
    p = mmap(NULL, size, PROT_READ,
             MAP_SHARED, fileno(fp), 0);
    if (p == MAP_FAILED) {
        printf("can not mmap file, error = %d\n", errno);
        fclose(fp);
        return -1;
    }

    … …

    long long *q = (long long*)p;

    while(1) {
        printf("%lld\t\t%lld\t\t%lld\t\t%lld\n", q[0], q[1], q[2], q[3]);
        sleep(1);
    }

    return 0;
}

在producer执行一段时间后,我们可以用reader去提取producer的实时运行数据了。

$ reader
2583        26820        268200        2682000
5793        58920        589200        5892000
9142        92410        924100        9241000
12431        125300        1253000        12530000
15586        156850        1568500        15685000
… …

需要注意的是两个进程映射的虽然是同一个文件,但各自进程空间映射的地址是不同的。如果在指标里存储地址数据,那另外一个进程在访问该地址时必然会出现问题。

在这个方案中,由于两个进程是读写同一块内存(虽然在各自进程空间的地址是不同的),因此数据是实时的。但由于两个进程间并没有任何同步机制,可能会产生误差,就好比一个进程中的两个线程对进程中某块地址空间一读一写这种情况一样。不过对于我们这种场景,这个问题是一般是可以被容忍和接受的,毕竟我们通过运行数据只是想了解一种运行趋势而已。如果producer中存在多个有亲缘关系的子进程或多线程要同时更新基础运行数据,那势必是要用锁或其他原子操作做数据操作的同步的。另外我们用的是内存映射具名的文件,OS会定期将数据刷到磁盘上,不过这个消耗对于小文件来说,对整体性能影响可忽略不计。

一旦业务系统具备了提供基础运行数据的能力,我们就可以根据我们的需求按照数据的格式打造我们所需要的各类数据提取和分析工具了。如果需要长期记录业务系统的运行情况,我们也可以实现类似sar这样的工具,以在后台定期对系统的运行数据进行记录,并提供历史查询等相关功能。

这种基于内存映射文件的方法还有一个好处,那就是我们可以用任何支持mmap调用的编程语言来实现数据提取工具,而不一定非得用C/C++这种原生适配Linux API的语言。

如果你觉得这种方案可行,那后续的重点就是基础运行数据的设计问题了。罗马不是一天建成的,/proc下的基础数据也不是一天就设计到位的。在基础数据设计这方面也是需要有很多考虑的,比如是文本还是二进制,用什么类型数据,还可能需要考虑一些数据对齐问题等。当然这就不是本文的重点了,就不细说了。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言进阶课 AI原生开发工作流实战 Go语言精进之路1 Go语言精进之路2 Go语言第一课 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats