分类 技术志 下的文章

Transaction模式的C实现

提到Transaction模式(即事务模式),很多人会感到陌生。这并不奇怪,在大名鼎鼎的GoF的《Design Pattern》一书中,它仅仅是Command模式的别名罢了。不过在实际的开发中,我们却经常会遇到可以应用事务模式的场景。本文可以理解成Command模式在事务领域的应用,但这样说有些麻烦,我们莫不如直接称之为Transaction模式。

与前几篇设计模式C实现系列文章一样,这篇文章也源于对实际问题的思考和总结。这次的问题是这样的:我们的业务系统实现了一个ftp上传文件的功能,其v1版代码的结构简化后大致如下:

int ftp_upload_file(const char *filename, const remote_server_desc *desc) {
    int ret;

    ret = upload_local_file(filename, desc);
    if (ret)
        return ret;

    ret = remove_local_file(filename);
    if (ret)
        return ret;

    return rename_remote_file(filename, desc);
};

代码的大致流程是这样的:
1、首先调用upload_local_file,将本地文件(比如foo.txt)上传到远程主机(上传后名字为foo.txt.tmp)
2、然后调用remove_local_file,删除本地文件(如foo.txt)
3、最后调用rename_remote_file,对远程主机上的文件进行改名操作(如将foo.txt.tmp改为foo.txt)

正常情况下,这版代码工作的也很好,以下是正常情况下的输出:
upload [foo.txt.tmp] to host [10.10.12.123, incoming/txt] Ok!
remove localfile [foo.txt] Ok!
rename [foo.txt.tmp] to [foo.txt] Ok!

但明眼人都可以看出v1版本代码的问题,那就是对业务异常的处理不够理想,下面列举一些可能出现异常的环节:
1、upload_local_file可能出现异常,返回失败
这时文件也许已经上传成功,我们在退出整个上传流程之前,应该尝试调用remove_remote_file,删除远程主机上的文件,恢复系统状态到上传前状态;

2、remove_local_file可能出现异常,返回失败
此时文件已经上传成功,若不做任何处理而直接退出的话,会导致下次重复上传同名文件而出现覆盖异常。为了防止这一问题的发生,我们在退出整个上传流程之前,应该尝试调用remove_remote_file,删除远程主机上的文件,恢复系统状态;

3、rename_remote_file也可能出现异常,返回失败
此时文件已经上传成功,且本地文件已经被删除,若改名失败而不做任何处理,将会导致已经上传到远程主机上的文件永远不会被处理(因为后缀名为.tmp,远程主机上的处理程序无法识别)。为了应对这一异常,我们应该在退出整个上传流程之前,恢复本地文件,并删除已经上传到远程主机上的文件,以恢复系统状态。

于是,我们就有了v2版代码,见下面:

int ftp_upload_file(const char *filename, const remote_server_desc *desc) {
    int ret;

    ret = upload_local_file(filename, desc);
    if (ret) {
        (void)remove_remote_file(filename, desc);
        return ret;
    }

    ret = remove_local_file(filename);
    if (ret) {
        (void)remove_remote_file(filename, desc);
        return ret;
    }

    ret = rename_remote_file(filename, desc);
    if (ret) {
        (void)remove_remote_file(filename, desc);
        (void)recovery_local_file(filename);
        return ret;
    }

    return ret;
};

这样修改后,若rename出现异常,则执行结果会变为:
upload [foo.txt.tmp] to host [10.10.12.123, incoming/txt] Ok!
remove localfile [foo.txt] Ok!
rename [foo.txt.tmp] to [foo.txt] Failed!
remove [foo.txt.tmp] from host [10.10.12.123, incoming/txt] Ok!
recover localfile [foo.txt] Ok!

程序在出现异常后将系统状态恢复到未操作前,并会在下一次操作中重新尝试。可以看出这是一个典型的事务场景,即整个上传过程是一个不可分割的整体,其中包括的诸多操作要么都做,要么都不做。

一切初看上去都很美!但用优雅设计的尺度细致考量,我们就会发现一些问题:
首先,如果一个事务场景包含的操作序列很多,那代码中的异常处理将是很痛苦的事情,以最后一步操作为例,一旦异常出错,我们就需要显式做N步回退处理,代码必然显得十分繁琐。另外大量的错误码判断,也会引入诸多if,势必使得代码味道较差;
其次,事务操作的具体实现都暴露给调用者,这在调用者与事务实现之间引入耦合,不利于代码的单元测试与调试;
最后,类似的事务场景在系统中存在很多,如果按v2版本的实现方式,那么系统中将会存在大量类似结构的代码,也算是一种重复吧。

我们的解决手段无非还是面向接口和封装变化,于是我们就有了充分参考了Transaction模式解决方法的v3版代码。

/* 通用事务接口 transaction_unit.h */
struct transaction_unit_t {
    int (*execute)(struct transaction_unit_t *this, void *arg); /* alias: commit */
    int (*unexecute)(struct transaction_unit_t *this, void *arg); /* alias: rollback */
};

/* upload_request.h */
struct upload_request {
    char filename[PATH_MAX];
    char ip[16];
    char path[PATH_MAX];
};

/* ftp_upload_transaction_unit.h */
struct transaction_unit_t* ftp_upload_transaction_unit_new();
void ftp_upload_transaction_unit_destroy(struct transaction_unit_t **tu);

/* ftp_upload_transaction_unit.c */

typedef struct operation_pair operation_pair;
typedef APR_RING_HEAD(operation_pair_head_t, operation_pair) operation_pair_head_t;

struct operation_pair {
    APR_RING_ENTRY(operation_pair) link;
    int (*do_func)(struct upload_request* r);
    int (*undo_func)(struct upload_request* r);
};

struct ftp_upload_transaction_unit_t {
    struct transaction_unit_t tu;
    operation_pair_head_t     ops;
    operation_pair            *op; /* 记录操作异常所在单元 */
};

struct transaction_unit_t* ftp_upload_transaction_unit_new() {
    struct ftp_upload_transaction_unit_t *tu;
    tu  = (struct ftp_upload_transaction_unit_t*)malloc(sizeof(*tu));
    if (!tu) return NULL;

    memset(tu, 0, sizeof(tu));
    tu->tu.execute = ftp_upload_transaction_execute;
    tu->tu.unexecute = ftp_upload_transaction_unexecute;
    APR_RING_INIT(&(tu->ops), operation_pair, link);

    operation_pair *op = (operation_pair*)malloc(sizeof(*op)); /* 这里省略一些异常处理,下面也是如此 */
    op->do_func = upload_local_file;
    op->undo_func = remove_remote_file;
    APR_RING_ELEM_INIT(op, link);
    APR_RING_INSERT_TAIL(&(tu->ops), op, operation_pair, link);

    op = (operation_pair*)malloc(sizeof(*op));
    op->do_func = remove_local_file;
    op->undo_func = recover_local_file;
    APR_RING_ELEM_INIT(op, link);
    APR_RING_INSERT_TAIL(&(tu->ops), op, operation_pair, link);

    op = (operation_pair*)malloc(sizeof(*op));
    op->do_func = rename_remote_file;
    op->undo_func = NULL;
    APR_RING_ELEM_INIT(op, link);
    APR_RING_INSERT_TAIL(&(tu->ops), op, operation_pair, link);

    return (struct transaction_unit_t*)tu;
}

static int ftp_upload_transaction_execute(struct transaction_unit_t *tu, void *arg) {
    struct ftp_upload_transaction_unit_t *this = (struct ftp_upload_transaction_unit_t*)tu;

    operation_pair *op = NULL;
    int ret = 0;

    APR_RING_FOREACH(op, &(this->ops), operation_pair, link) {
        if (op) {
            if (op->do_func) {
                ret = op->do_func(arg);
                if (ret) {
                    this->op = op;
                    return ret;
                }
            }
        }
    }

    return ret;
}

static int ftp_upload_transaction_unexecute(struct transaction_unit_t *tu, void *arg) {
    struct ftp_upload_transaction_unit_t *this = (struct ftp_upload_transaction_unit_t*)tu;

    operation_pair *op = this->op;
    if (!op)
        return 0;

    do {
        if (op->undo_func) {
            op->undo_func(arg);
        }
        op = APR_RING_PREV(op, link);
    } while(op && (op != APR_RING_SENTINEL(&(this->ops), operation_pair, link)));

    return 0;
}

/* main.c */
int ftp_upload_file(struct upload_request *r) {
    int ret;
    struct transaction_unit_t *tu = ftp_upload_transaction_unit_new();

    /* 事务开始 */
    ret = tu->execute(tu, (void*)r);
    if (ret)
        tu->unexecute(tu, (void*)r);
    /* 事务结束 */

    return ret;
};

int main(int argc, const char *argv[])
{
    struct upload_request r = {"foo.txt", "10.10.12.123", "incoming/txt"};
    return ftp_upload_file(&r);
}

代码有些长,所以省略了destroy等一些非关键性的实现代码。这里将事务模式的基本接口抽象为transaction_unit,而ftp_upload_transaction_unit则是transaction_unit接口的一个实现,它通过一个环形链表来组织由事务处理函数(do_func)以及对应事务回滚函数(undo_func)组成的操作单元。沿着链表正向遍历,即执行事务处理操作集合;一旦某个事务操作出现异常,便改为沿着链表反向遍历,即执行事务回滚操作集合,这样也就实现了一种具体的事务模式。

注意:这里仅是一种事务模式的实现思路,但其实现是否符合事务的要求还不一定,要给出一个完备的事务实现可并非易事,实现FTP上传事务更非易事。

原本设计模式的C实现系列文章在上一篇《Chain of Responsibility模式的C实现》之后就应该嘎然而止的,但变化总比计划快,于是就有了这篇文章。

Chain of Responsibility模式的C实现

又是一个行为类的模式,似乎这类模式在使用C语言开发的项目中适应性更强,而另外两类模式创建型和结构型则略显不受待见^_^。

Chain of Responsibility模式(中文名:职责链模式)是一个不算复杂的模式。虽不复杂,但用好了同样可以解决大问题。个人觉得其最大的好处就在于可以动态地重组针对一类对象的处理流程。正是得益于这一优势,它才可以在纷繁芜杂的业务领域站稳脚跟。

我们遇到的问题是这样的:有一类消息需要我们的系统处理,消息在系统入口处需经过种种业务层面上的校验,只有通过所有校验的消息才被允许进入到我们的系统中并被视为合法的消息。针对来自不同企业的消息,系统在入口处的校验规则是不同的,对于信用度较高的企业,系统实施的校验较少;而对于信用度不高的企业或新签约企业来说,其校验规则就相对多些;随着企业的信用度的变化,系统也会自动地调整对其下发消息的校验规则集。

最初关于这个部分的系统伪码大致是这样的:
int check_msg(corp_info, msg) {
    if (corp_info->need_check_source) {
        if (FAILED == check_source(msg))
            return xx;
    }

    if (corp_info->need_check_destination) {
        if (FAILED == check_destination(msg))
            return xx;
    }

    if (corp_info->need_check_priority) {
        if (FAILED == check_priority(msg))
            return xx;
    }

    if (corp_info->need_check_content) {
        if (FAILED == check_content(msg))
            return xx;
    }

    return 0;
}

在check_msg外部,系统根据企业的信用度设置corp_info中的多个check feature开关,诸如need_check_source、need_check_content等,从而使得check_msg内部可以根据企业的不同feature开关情况,对企业发送的消息实施不同的校验规则。

这里消息校验的请求者与消息校验的处理者具有一定的耦合,另外check_msg中满眼的if语句也让我们的神经为之紧绷!于是我们尝试移除if,尝试降低请求者和执行者之间的耦合。在《设计模式》中我们找到了Chain of Responsibility模式,我们决定试试!

我们首先定义了handler_t接口:

struct handler_t {
    void (*set_successor)(struct handler_t *this, struct handler_t *successor);
    struct handler_t* (*get_successor)(struct handler_t *this);
    int (*handle_request)(struct handler_t *this, void *obj, void *args);
    int type; /* handler类型 */
};

接下来,我们根据例子的需要逐个定义该接口的实现:source_checker、destination_checker、priority_checker和content_checker。以source_checker为例:

/* source_checker.h */
struct handler_t* source_checker_new();
void source_checker_destroy(struct handler_t **h);

/* source_checker.c */
struct source_checker_t {
    struct handler_t h;
    struct handler_t *successor;
};

static void _set_successor(struct handler_t *this, struct handler_t *successor) {
    struct source_checker_t *h = (struct source_checker_t*)this;
    h->successor = successor;
}

static struct handler_t* _get_successor(struct handler_t *this) {
    struct source_checker_t *h = (struct source_checker_t*)this;
    return h->successor;
}

static int _handle_request(struct handler_t *this, void *obj, void *args) {
    struct source_checker_t *h = (struct source_checker_t*)this;
    struct msg_t *msg = (struct msg_t*)obj;

    if (校验失败) /* 伪码 */
        return FAILED;
    printf(“[source_checker]: check msg – [%s]\n”, msg->msg_id);

    if (h->successor)
        return (h->successor->handle_request(h->successor, obj, args));

    return SUCCESS;
}

struct handler_t* source_checker_new() {
    struct source_checker_t *h;

    h = (struct source_checker_t*)malloc(sizeof(*h));
    if (!h) return NULL;

    memset(h, 0, sizeof(*h));
    h->h.set_successor = _set_successor;
    h->h.get_successor = _get_successor;
    h->h.handle_request = _handle_request;
    h->h.type = SOURCE_CHECKER;

    return (struct handler_t*)h;
}

void source_checker_destroy(struct handler_t **h) {
    struct source_checker_t *p = (struct source_checker_t*)(*h);

    if (p) free(p);
    (*h) = NULL;
}

destination_checker、priority_checker和content_checker与source_checker的实现类似,关键在于_handle_request的实现不同。

现在我们就可以在初始化阶段为不同企业组装不同的业务校验流程了,假设我们有两家企业A和B,A企业下发的消息需要进行全部业务校验,而B企业下发的消息仅需进行source check和destination check:

/* A企业消息的业务校验链 */
struct handler_t *A_destination_checker = destination_checker_new();
struct handler_t *A_priority_checker = priority_checker_new();
struct handler_t *A_content_checker = content_checker_new();
struct handler_t *A_msg_checker = source_checker_new();

A_msg_checker->set_successor(A_msg_checker, A_destination_checker);
A_destination_checker->set_successor(A_destination_checker, A_priority_checker);
A_priority_checker->set_successor(A_priority_checker, A_content_checker);

/* B企业消息的业务校验链 */
struct handler_t *B_destination_checker = destination_checker_new();
struct handler_t *B_msg_checker = source_checker_new();

B_msg_checker->set_successor(B_msg_checker, B_destination_checker);

我们可以将msg_checker的放入corp_info中,这样check_msg的新实现如下:
int check_msg(corp_info, msg) {
    return corp_info->msg_checker->handle_request(corp_info->msg_checker, (void*)msg, NULL);
}

这样通过A企业下发的消息testAmsg通过check_msg得到的结果是:
[source_checker]: check msg – [testAmsg]
[destination_checker]: check msg – [testAmsg]
[priority_checker]: check msg – [testAmsg]
[content_checker]: check msg – [testAmsg]

而B企业下发的消息testBmsg通过check_msg得到的结果则是:
[source_checker]: check msg – [testBmsg]
[destination_checker]: check msg – [testBmsg]

前面说过动态重组针对某一对象的业务流程是职责链模式一大特点。当某企业信用度发生变化时,该企业对应的checker链也会动态修改。比如当企业A信用度增加时,系统将去除其对应的content check流程,去除过程的实现如下:

struct handler_t *h = A_msg_checker;
struct handler_t *successor = h->get_successor(h);

while (successor) {
    if (successor->type == CONTENT_CHECKER) {
        h->set_successor(h, successor->get_successor(successor));
        break;
    }
   
    h = successor;
    successor = successor->get_successor(successor);
}

重组校验链后,企业A下发的消息testAmsg通过msg_check得到的结果就变成了:
[source_checker]: check msg – [testAmsg]
[destination_checker]: check msg – [testAmsg]
[priority_checker]: check msg – [testAmsg]

也许大家也看到了职责链模式的缺点,那就是每增加一个业务处理对象就要增加一个handler_t的具体实现,如诸多xx_checker,在C语言开发中这至少需要一个头文件与一个源文件。但职责链模式对降低请求者与处理者之间的耦合,以及支持职责链的动态重组方面还是会给你带来很大帮助的。是否使用这种模式,需要你自己根据实际情况权衡利弊后做出选择。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 Go语言编程指南
商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats