标签 http 下的文章

Service Weaver:以单体形式编码,以微服务形式部署

本文永久链接 – https://tonybai.com/2023/10/09/service-weaver-coding-in-monolithic-deploy-in-microservices

分布式应用的主流架构模式演化为微服务架构已经有些年头了。微服务、DevOps、持续交付和容器技术(k8s)是构成最初云原生概念的核心要素。它们相生相拌,共同演进,并推动了云计算全面进入云原生时代

云原生应用普遍采用微服务架构,遗留的单体应用程序会逐步演进并拆分为多个微服务,新应用则会直接采用微服务架构进行设计与实现。微服务的好处是显而易见的:

  • 每个微服务都编译为一个二进制文件并独立部署和扩展,可以提高资源利用率;
  • 一个微服务的崩溃不会影响到其他微服务,限制了错误的传播半径,从而提高了容错能力;
  • 改善了抽象的边界。微服务需要清晰明确的API,降低了代码纠缠不清的可能性;
  • 灵活部署,不同微服务的二进制文件可以以不同频率发布,从而实现更敏捷的代码升级。
  • … …

不过做过微服务的朋友都知道,微服务架构带来的不仅仅是好处,还有很多挑战:

  • 单体应用内的模块间可通过内存直接交互,而在微服务架构的应用中,多个微服务需要进行跨进程跨机器的通信,对数据的序列化和反序列化操作必不可少,其开销很难避免,对应用性能是有较大损耗的;
  • 研究表明,三分之二的故障是由于不同版本的微服务之间的交互引发的,这会损害应用的正确性;
  • 每个微服务开发人员都有自己的发布和管理计划,而无法像单体应用那样使用单个二进制文件来统一构建、测试和部署,这给微服务开发管理带来了很高的复杂性;
  • API管理变得复杂。一旦某个微服务发布了,它的API很难在不影响其他使用该API的服务的情况下进行变更,新老API同时存在是常态;
  • 减慢了应用程序开发的速度。在进行会影响多个微服务的更改时,开发人员无法原子地实现和部署这些更改。他们必须仔细计划如何根据自己的发布时间表在n个微服务中引入变更;
    … …

由此可见,微服务并非“银弹”,人们在消除微服务的缺点方面做了很多工作,不可谓不努力,但收效甚微,甚至出现了回归monolith(大单体)的现象

今年年初Google发布了一个在这方面的探索成果:Service Weaver。Service Weaver不仅仅是一个分布式应用的开发框架,更是一个旨在减少或消除微服务弊端的探索实验的结论。

Service Weaver到底有何与众不同?它的核心抽象是什么?它的最大优点又是什么呢?在这一篇文章中,我就和大家一起来学习和了解一下Service Weaver这个开发框架。

1. Service Weaver简介

Service Weaver是Google开源的一个编程框架(programming framework) ,用于编写、部署和管理用Go开发的分布式应用程序。

注:随着Service Weaver的演进,后续可能会有其他语言的版本。

使用Service Weaver,你可以像编写在本地机器上运行的传统单进程Go可执行文件一样编写应用程序。然后,将其部署到云中,该框架会将其分解为一组微服务,并将其与云提供商(主要是k8s)集成(如监控、跟踪、日志等)。简单来说,就是“以单体形式编码,以微服务形式部署”

开篇提过,Google开源的Service Weaver本就是为解决微服务架构在实践中出现的诸多问题而提出的创新思路与实验,为此它提出并实现了三个核心原则

  • 在构建阶段,开发人员只需编写模块化的单体程序;
  • 在首次部署和运行阶段,Service Weaver会将逻辑组件分配给物理进程,可以是本地的一个进程,也可以是多个进程,当然最主流的还是分配给运行在公有云提供商k8s的不同pod;
  • 以原子方式升级变更应用,彻底杜绝应用的不同版本间的交互。

这么说依然很抽象,闻名不如见面,接下来我们就用一些例子来看一下Service Weaver是如何践行这三个原则的。

我们先来看看用Service Weaver开发的“Hello, World”程序长什么样子。

2. Hello, World

安装Service Weaver很简单,只需执行下面命令:

$go install github.com/ServiceWeaver/weaver/cmd/weaver@latest

$weaver
USAGE

  weaver generate                 // weaver code generator
  weaver version                  // show weaver version
  weaver single    <command> ...  // for single process deployments
  weaver multi     <command> ...  // for multiprocess deployments
  weaver ssh       <command> ...  // for multimachine deployments
  weaver gke       <command> ...  // for GKE deployments
  weaver gke-local <command> ...  // for simulated GKE deployments
  weaver kube      <command> ...  // for vanilla Kubernetes deployments

DESCRIPTION

  Use the "weaver" command to deploy and manage Weaver applications.

  The "weaver generate", "weaver version", "weaver single", "weaver multi", and
  "weaver ssh" subcommands are baked in, but all other subcommands of the form
  "weaver <deployer>" dispatch to a binary called "weaver-<deployer>".
  "weaver gke status", for example, dispatches to "weaver-gke status".

注:Weaver要求Go版本高于1.21。另外在MacOS上安装使用时,官方文档提到要开启export CGO_ENABLED=1; export CC=gcc; 不过CGO_ENABLED=1通常是默认的。另外我使用CC=clang也可以正常安装和使用weaver。

安装完Weaver后,我们就来看一个基于Weaver的Hello, World示例,了解一下基于Weaver框架开发的应用的基本结构。

我们创建一个hello目录,然后在hello下面使用go mod init hello来初始化一个go module。这个例子非常简单,hello目录下只有一个main.go:

// serviceweaver-examples/hello/main.go

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/ServiceWeaver/weaver"
)

func main() {
    if err := weaver.Run(context.Background(), serve); err != nil {
        log.Fatal(err)
    }
}

// app is the main component of the application. weaver.Run creates
// it and passes it to serve.
type app struct {
    weaver.Implements[weaver.Main]
}

// serve is called by weaver.Run and contains the body of the application.
func serve(context.Context, *app) error {
    fmt.Println("Hello, World")
    return nil
}

我们看到:示例导入了weaver包,然后在main函数中调用weaver.Run函数。Run函数的原型如下:

// github.com/ServiceWeaver/weaver/weaver.go
func Run[T any, P PointerToMain[T]](ctx context.Context, app func(context.Context, *T) error) error

weaver充分利用了Go 1.18引入的泛型,Run就是一个泛型函数,它的第二个参数为app,这是一个函数类型的参数。顾名思义,app这个函数封装了整个应用的主运行逻辑。在hello这个示例中,我们为Run的第二个参数传入的是serve。而serve的逻辑非常简单,就是输出“Hello, World”,然后就返回nil了,返回nil表示正常退出。weaver.Run会处理应用的生命周期,比如优雅关闭等,serve函数就只需要关心业务逻辑即可,通过这种方式,通用的服务框架代码和业务代码便分离开来,降低了耦合,提高可维护性。

到这里,很多读者可能注意到了:由于示例过于简单,serve函数并没有使用传入的第二个参数(类型为*app),但在用Weaver开发的实用程序中,Run的第二个参数是整个应用的核心,并且app这个类型恰好就是weaver.Run泛型函数中T的类型实参(type argument)。

Run函数的注释中明确说明:T类型(app)必须是一个struct类型且包含一个weaver.Implements[weaver.Main]的嵌入字段,在该示例中app类型的定义恰是如此:

// serviceweaver-examples/hello/main.go
type app struct {
    weaver.Implements[weaver.Main]
}

说到这里,就不得不提到Service Weaver的核心抽象:组件(component)了!基于Service Weaver框架开发的应用是由一个组件的集合。实际上,Weaver中的组件就是一个普通Go接口的实现,编写代码时,组件间的交互也是通过接口的方法调用完成的。

那么,上面示例中的组件在哪里呢?上面的示例仅包含一个Weaver应用必须的组件:main组件。app类型就理解为一个main组件,它通过嵌入weaver.Implements[weaver.Main]这个类型实现了weaver.Main接口:

// Main is the interface implemented by an application's main component.
type Main interface{}

对于Weaver应用而言,main组件是不可获取的,如果注释掉app结构体类型中weaver.Implements[weaver.Main]这一行,那么无论执行weaver generate命令还是go run命令,你得到的都会是错误:

$weaver generate .
-: # hello
./main.go:12:22: *app does not satisfy "github.com/ServiceWeaver/weaver".PointerToMain[app] (missing method implements)
/Users/tonybai/Test/Go/service-weaver/hello/main.go:12:12: *app does not satisfy "github.com/ServiceWeaver/weaver".PointerToMain[app] (missing method implements)

$go run .
# hello
./weaver_gen.go:34:40: cannot use (*app)(nil) (value of type *app) as "github.com/ServiceWeaver/weaver".InstanceOf["github.com/ServiceWeaver/weaver".Main] value in variable declaration: *app does not implement "github.com/ServiceWeaver/weaver".InstanceOf["github.com/ServiceWeaver/weaver".Main] (missing method implements)
./weaver_gen.go:37:25: cannot use (*app)(nil) (value of type *app) as "github.com/ServiceWeaver/weaver".Unrouted value in variable declaration: *app does not implement "github.com/ServiceWeaver/weaver".Unrouted (missing method routedBy)
./main.go:12:22: *app does not satisfy "github.com/ServiceWeaver/weaver".PointerToMain[app] (missing method implements)

好了,大致了解Weaver应用的结构后,我们来运行一下这个示例:

$go mod tidy
go: finding module for package github.com/ServiceWeaver/weaver
go: found github.com/ServiceWeaver/weaver in github.com/ServiceWeaver/weaver v0.21.2
go: downloading modernc.org/ccgo/v3 v3.16.13
go: downloading modernc.org/cc/v3 v3.40.0
go: downloading lukechampine.com/uint128 v1.2.0
go: downloading modernc.org/token v1.0.1

$weaver generate .
$go run .
╭───────────────────────────────────────────────────╮
│ app        : hello                                │
│ deployment : ca0fcdf2-d9bc-456b-a668-159688e3cca5 │
╰───────────────────────────────────────────────────╯
Hello, World

我们看到,在go run执行之前,我们通过weaver generate命令生成一些代码,这些生成的代码放在了weaver_gen.go中,有100多行,是weaver应用运行所必须的stub代码。

hello, world虽然简单易懂,但对Weaver的核心抽象:逻辑组件(component)的体现并不明显,我们再来看一个复杂一些的例子。

3. 一个http服务器例子

我们来实现一个http服务器的例子,下面是这个例子的组件逻辑拓扑结构:

从图中可以看到,这个实例程序一共有三个weaver component:main组件(listener)、reverser组件(用于将输入的字符串反转)和converter组件(用于将输入的字符串变成大写字符串)。

reverser组件和converter组件都比较简单,每个组件对应的接口仅有一个方法,它们的代码如下:

// serviceweaver-examples/httpserver/reverser.go

package main

import (
    "context"

    "github.com/ServiceWeaver/weaver"
)

// Reverser component.
type Reverser interface {
    Reverse(context.Context, string) (string, error)
}

// Implementation of the Reverser component.
type reverser struct {
    weaver.Implements[Reverser]
}

func (r *reverser) Reverse(_ context.Context, s string) (string, error) {
    runes := []rune(s)
    n := len(runes)
    for i := 0; i < n/2; i++ {
        runes[i], runes[n-i-1] = runes[n-i-1], runes[i]
    }
    return string(runes), nil
}

// serviceweaver-examples/httpserver/converter.go

package main

import (
    "context"
    "strings"

    "github.com/ServiceWeaver/weaver"
)

// Converter component.
type Converter interface {
    ToUpper(context.Context, string) (string, error)
}

// Implementation of the Converter component.
type converter struct {
    weaver.Implements[Converter]
}

func (r *converter) ToUpper(_ context.Context, s string) (string, error) {
    return strings.ToUpper(s), nil
}

接下来,我们实现这个示例的实现weaver.Main接口的app类型:

// serviceweaver-examples/httpserver/main.go

type app struct {
    weaver.Implements[weaver.Main]
    reverser  weaver.Ref[Reverser]
    converter weaver.Ref[Converter]
    lis       weaver.Listener
}

这里app结构体类型通过weaver.Ref嵌入了实现了另外两个组件接口的组件实例,Ref函数的定义如下:

// Ref[T] is a field that can be placed inside a component implementation
// struct. T must be a component type. Service Weaver will automatically
// fill such a field with a handle to the corresponding component.
type Ref[T any] struct {
    value T
}

// Get returns a handle to the component of type T.
func (r Ref[T]) Get() T { return r.value }

此外,通过泛型类型Ref的Get方法,可以获得对相应组件的访问权。

app结构体类型中还包含了一个weaver.Listener类型的实例,Listener理论上并非组件,而是Weaver框架提供了网络服务端口监听的实现,可以放置在任何提供网络服务的组件实现内部,比如本示例的app这个main组件。app将reverser、converter和listener聚合在一起,为后续的serve函数实现提供支持。

接下来,我们看看serve函数的实现:

// serviceweaver-examples/httpserver/main.go

func serve(ctx context.Context, app *app) error {
    // The lis listener will listen on a random port chosen by the operating
    // system. This behavior can be changed in the config file.
    fmt.Printf("http listener available on %v\n", app.lis)

    // Serve the /reverse endpoint.
    http.HandleFunc("/reverse", func(w http.ResponseWriter, r *http.Request) {
        name := r.URL.Query().Get("name")
        if name == "" {
            name = "World"
        }
        reversed, err := app.reverser.Get().Reverse(ctx, name)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        fmt.Fprintf(w, "after reversing, name is %s\n", reversed)
    })
    // Serve the /convert endpoint.
    http.HandleFunc("/convert", func(w http.ResponseWriter, r *http.Request) {
        name := r.URL.Query().Get("name")
        if name == "" {
            name = "World"
        }
        converted, err := app.converter.Get().ToUpper(ctx, name)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        fmt.Fprintf(w, "after converting, name is %s\n", converted)
    })
    return http.Serve(app.lis, nil)
}

我们看到serve函数定义了两个端点/reverse和/convert的Handler函数,并通过http.Serve启动了一个http服务器,http服务器返回,应用退出,否则http服务将一直运行。

我们来运行一下这个程序:

$cd serviceweaver-examples/httpserver
$go mod tidy
$weaver generate .
$go run .
╭───────────────────────────────────────────────────╮
│ app        : httpserver                           │
│ deployment : 55827837-896f-4060-88c2-f1f1d953d142 │
╰───────────────────────────────────────────────────╯
http listener available on [::]:59493

我们看到,示例中的httpserver启动后在59493这个端口监听客户端的连接,我们用curl工具来测试一下:

$curl "http://localhost:59493/convert?name=abcdefg"
after converting, name is ABCDEFG
$curl  "http://localhost:59493/reverse?name=abcdefg"
after reversing, name is gfedcba

我们看到,无论是reverser组件还是converter组件工作都正常。

由于我们没有指定端口,59493是一个随机端口。如果要指定监听的地址和端口,我们可以借助weaver提供的toml格式的配置文件来实现:

// weaver.toml
[single]
listeners.lis = {address = "localhost:8080"}

基于weaver.toml配置文件启动httpserver的命令如下:

$SERVICEWEAVER_CONFIG=weaver.toml go run .
╭───────────────────────────────────────────────────╮
│ app        : httpserver                           │
│ deployment : ee49694c-4935-4f44-96f3-cc7d1d0167ae │
╰───────────────────────────────────────────────────╯
http listener available on 127.0.0.1:8080

在这种模式下启动的httpserver,所有组件都会在一个单一的进程中,组件间的通信通过方法调用进行。这种单体程序在单个进程中部署运行的方式称为single process部署模式,十分适合开发者对程序的开发与调试。weaver为这种方式提供了专门的子命令single,我们可以通过single命令在单进程启动httpserver,不过我们要修改一下weaver.toml:

// weaver.toml

[single]
listeners.lis = {address = "localhost:8080"}

[serviceweaver]
binary = "./httpserver"

无论是single子命令,还是后面即将讲到的multi,都是基于一个可执行文件进行的,因此我们要将httpserver这个示例编译为一个可执行文件”httpserver”,我已经将编译命令放入Makefile,大家输入make命令执行即可。

有了可执行的二进制文件httpserver后,我们就可以使用single子命令启动单进程版的httpserver了:

$weaver single deploy weaver.toml
╭───────────────────────────────────────────────────╮
│ app        : httpserver                           │
│ deployment : ad7c0341-d5d2-4182-8944-306d7682e708 │
╰───────────────────────────────────────────────────╯
http listener available on 127.0.0.1:8080

在开篇讲Service Weaver的三个核心原则时提到,基于Weaver的应用既可以跑在一个进程中,也可以部署在多个进程,以及云提供商的k8s环境中,下面我们就来看看weaver应用的部署,先来将单进程部署模式改为本地多进程部署模式。

4. 部署

基于Weaver应用的部署方式与编码完全解耦,我们无需修改源码便可以实现多进程部署。唯一要做的就是改改weaver.toml,新增多进程部署模式下应用的监听地址信息:

// weaver.toml
[single]
listeners.lis = {address = "localhost:8080"}

[serviceweaver]
binary = "./httpserver"

[multi]
listeners.lis = {address = "localhost:8080"} // 新增

接下来使用下面命令,我们就可以将httpserver以多进程的形式启动起来:

$weaver multi deploy weaver.toml
╭───────────────────────────────────────────────────╮
│ app        : httpserver                           │
│ deployment : bd689290-4929-47f1-a0f0-774d5e1a9307 │
╰───────────────────────────────────────────────────╯
S1003 18:51:02.042859 stdout               ac04576d                      │ http listener available on 127.0.0.1:8080
S1003 18:51:02.043210 stdout               c03c4eed                      │ http listener available on 127.0.0.1:8080

weaver multi子命令提供了查看httpserver多进程启动后状态的方法:

$weaver multi status
╭──────────────────────────────────────────────────────────╮
│ DEPLOYMENTS                                              │
├────────────┬──────────────────────────────────────┬──────┤
│ APP        │ DEPLOYMENT                           │ AGE  │
├────────────┼──────────────────────────────────────┼──────┤
│ httpserver │ bd689290-4929-47f1-a0f0-774d5e1a9307 │ 1m3s │
╰────────────┴──────────────────────────────────────┴──────╯
╭───────────────────────────────────────────────────────────────╮
│ COMPONENTS                                                    │
├────────────┬────────────┬──────────────────────┬──────────────┤
│ APP        │ DEPLOYMENT │ COMPONENT            │ REPLICA PIDS │
├────────────┼────────────┼──────────────────────┼──────────────┤
│ httpserver │ bd689290   │ weaver.Main          │ 30194, 30195 │
│ httpserver │ bd689290   │ httpserver.Converter │ 30198, 30199 │
│ httpserver │ bd689290   │ httpserver.Reverser  │ 30196, 30197 │
╰────────────┴────────────┴──────────────────────┴──────────────╯
╭─────────────────────────────────────────────────────╮
│ LISTENERS                                           │
├────────────┬────────────┬──────────┬────────────────┤
│ APP        │ DEPLOYMENT │ LISTENER │ ADDRESS        │
├────────────┼────────────┼──────────┼────────────────┤
│ httpserver │ bd689290   │ lis      │ 127.0.0.1:8080 │
╰────────────┴────────────┴──────────┴────────────────╯

在status输出的信息中,我们能看到deployment(部署)信息、组件(components)信息以及listener信息。从组件信息来看,weaver multi子命令将每个component放入了一个单独进程,包括main component,并且每个component的副本数(replica)为2,即一共启动了6个进程。从下面ps命令的输出结果也能印证这点:

$ps -ef|grep httpserver
  501 30194 30193   0  6:51下午 ttys006    0:00.05 /Users/tonybai/test/go/service-weaver/httpserver/httpserver
  501 30195 30193   0  6:51下午 ttys006    0:00.05 /Users/tonybai/test/go/service-weaver/httpserver/httpserver
  501 30196 30193   0  6:51下午 ttys006    0:00.07 /Users/tonybai/test/go/service-weaver/httpserver/httpserver
  501 30197 30193   0  6:51下午 ttys006    0:00.04 /Users/tonybai/test/go/service-weaver/httpserver/httpserver
  501 30198 30193   0  6:51下午 ttys006    0:00.05 /Users/tonybai/test/go/service-weaver/httpserver/httpserver
  501 30199 30193   0  6:51下午 ttys006    0:00.04 /Users/tonybai/test/go/service-weaver/httpserver/httpserver

在multi process这种模式下,应用的各个组件由于不在同一进程内,它们之间的通信由基于方法调用改为了基于RPC调用的方式。

weaver multi还提供了以web形式查看应用运行状态的命令:dashboard

$weaver multi dashboard
Dashboard available at: http://127.0.0.1:62183

weaver multi dashboard命令会自动打开浏览器并展示httpserver的各种运行信息和状态信息:

点击页面上的Deployment超链接,我们将进入到下面的页面中:

除此之外,页面最下方还有一个展示组件拓扑以及组件间traffic的图:

通过上图我们知道,reverse端点和convert端点分别接到过2次和1次请求。

注:web状态页面上的traces由于没有开启trace,会暂无数据。

和weaver multi一样,weaver ssh可以实现多机器部署,weaver kube实现基于k8s的部署,weaver gke实现在Google Kubernetes Engine上的部署,这里的multi、ssh、kube等都可以称为deployer。single、multi、ssh是weaver内置支持的,而其他weaver 则是调用weaver-完成的,比如:weaver gke status将调用weaver-gke status命令。

注:由于手里没有现成的kubernetes环境,weaver kube命令无法展示了。

到这里,我们已经践行了Service Weaver的两大核心原则:开发阶段以单体程序形式编码开发,以及运行时通过不同deployer(multi、ssh、k8s等)来实现部署环境与代码的解耦。到这里,你是否体会到了本文题目“以单体形式编码,以微服务形式部署”的深意了呢!

下面我们再来看看Weaver核心原则的第3条:原子升级。

5. 升级

对于使用go run或weaver multi deployment部署的应用程序来说,避免升级过程中的跨版本通信是轻而易举的事,因为每个部署都是独立运行的。

我本地没有Kubernetes环境,也没有GKE的账号,那么如何验证weaver的原子升级过程呢?好在weaver提供了gke-local,即在本地建立一个模拟gke环境,我们可以使用这种方式来看看通过weaver如何实现app的原子升级。

首先我们要执行下面命令单独安装weaver-gke-local:

$go install github.com/ServiceWeaver/weaver-gke/cmd/weaver-gke-local@latest

在我的机器和网络环境下,这个安装过程略显“漫长”,因为要拉取很多依赖的go module,还包括像k8s、k8s client这样的go module。

安装好weaver-gke-local后,我们基于httpserver建立一个新module:httpserver-upgrade。然后修改其weaver.toml,增加gke和rollout相关配置:

// serviceweaver-examples/httpserver-upgrade/weaver.toml

[single]
listeners.lis = {address = "localhost:8080"}

[serviceweaver]
binary = "./httpserver"
rollout = "5m" # Perform five minutes slow rollout.

[multi]
listeners.lis = {address = "localhost:8080"}

[gke]
regions = ["us-west1"]
listeners.lis = {public_hostname = "hello.com"}

然后,为了区分不同版本,我在main.go中为各个端点的处理handler加上了一些带有版本信息的日志,并重新执行make构建新的可执行文件。

下面我们就在gke-local环境下首次部署httpserver:

$weaver gke-local deploy weaver.toml
Deploying the application... Done
Version "b343b4de-bb84-4bd7-8bc0-09eb0054b07d" of app "httpserver" started successfully.
Note that stopping this binary will not affect the app in any way.
Tailing the logs...
S1004 06:33:14.621470 stdout               ea68b26c                      │ http v1 listener available on http://localhost:8000
S1004 06:33:14.627226 stdout               be97798d                      │ http v1 listener available on http://localhost:8000

我们可以ctrl+c结束weaver gke-local deploy这个命令的执行,但一旦部署成功,即便这个命令退出,已经部署的程序依然会运行。

^CTo continue watching the logs, run the following command:

    weaver gke-local logs --follow 'version == "b343b4de"'

并且按照上述提示,我们可以继续执行下面命令来tail整个应用的输出日志:

$weaver gke-local logs --follow 'version == "b343b4de"'
S1004 06:33:14.621470 stdout               ea68b26c                      │ http v1 listener available on http://localhost:8000
S1004 06:33:14.627226 stdout               be97798d                      │ http v1 listener available on http://localhost:8000

和multi子命令在本地多进程部署一样,在gke-local下部署后,我们也可以使用status查看应用部署信息和状态:

$weaver gke-local status
╭────────────────────────────────────────────────────────────────────╮
│ Deployments                                                        │
├────────────┬──────────────────────────────────────┬───────┬────────┤
│ APP        │ DEPLOYMENT                           │ AGE   │ STATUS │
├────────────┼──────────────────────────────────────┼───────┼────────┤
│ httpserver │ b343b4de-bb84-4bd7-8bc0-09eb0054b07d │ 4m55s │ ACTIVE │
╰────────────┴──────────────────────────────────────┴───────┴────────╯
╭─────────────────────────────────────────────────────────────────────╮
│ COMPONENTS                                                          │
├────────────┬────────────┬──────────┬──────────────────────┬─────────┤
│ APP        │ DEPLOYMENT │ LOCATION │ COMPONENT            │ HEALTHY │
├────────────┼────────────┼──────────┼──────────────────────┼─────────┤
│ httpserver │ b343b4de   │ us-west1 │ weaver.Main          │ 2/2     │
│ httpserver │ b343b4de   │ us-west1 │ httpserver.Converter │ 2/2     │
│ httpserver │ b343b4de   │ us-west1 │ httpserver.Reverser  │ 2/2     │
╰────────────┴────────────┴──────────┴──────────────────────┴─────────╯
╭─────────────────────────────────────────────────────────────────────────────────────────────╮
│ TRAFFIC                                                                                     │
├───────────┬────────────┬────────────┬────────────┬──────────┬────────────┬──────────────────┤
│ HOST      │ VISIBILITY │ APP        │ DEPLOYMENT │ LOCATION │ ADDRESS    │ TRAFFIC FRACTION │
├───────────┼────────────┼────────────┼────────────┼──────────┼────────────┼──────────────────┤
│ hello.com │ public     │ httpserver │ b343b4de   │ us-west1 │ [::]:62559 │ 0.5              │
│ hello.com │ public     │ httpserver │ b343b4de   │ us-west1 │ [::]:62564 │ 0.5              │
╰───────────┴────────────┴────────────┴────────────┴──────────┴────────────┴──────────────────╯
╭────────────────────────────╮
│ ROLLOUT OF httpserver      │
├─────────────────┬──────────┤
│                 │ us-west1 │
├─────────────────┼──────────┤
│ TIME            │ b343b4de │
│ Oct  3 22:37:59 │ 1.00     │
╰─────────────────┴──────────╯

我们看到整个应用被模拟部署到us-west1 region,每个组件有两个副本,用ps命令查看,我们也能看到6个进程:

$ps -ef|grep httpserver
  501 38480 35224   0  6:33上午 ttys006    0:00.13 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver
  501 38481 35224   0  6:33上午 ttys006    0:00.11 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver
  501 38482 35224   0  6:33上午 ttys006    0:00.10 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver
  501 38483 35224   0  6:33上午 ttys006    0:00.10 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver
  501 38484 35224   0  6:33上午 ttys006    0:00.10 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver
  501 38485 35224   0  6:33上午 ttys006    0:00.10 /Users/tonybai/test/go/service-weaver/httpserver-upgrade/httpserver

现在我们可以使用curl命令来验证一下应用的可用性:

$curl  --header 'Host: hello.com' "http://localhost:8000/reverse?name=abcdefg"
after reversing-v1, name is gfedcba
$curl  --header 'Host: hello.com' "http://localhost:8000/reverse?name=abcdefg"
after reversing-v1, name is gfedcba
$curl  --header 'Host: hello.com' "http://localhost:8000/reverse?name=abcdefg"
after reversing-v1, name is gfedcba
$curl  --header 'Host: hello.com' "http://localhost:8000/convert?name=abcdefg"
after converting-v1, name is ABCDEFG
$curl  --header 'Host: hello.com' "http://localhost:8000/convert?name=abcdefg"
after converting-v1, name is ABCDEFG

可以看到,app工作正常!

此外,我们还可以通过dashboard可以以图形化的方式观测app状态(weaver gke-local dashboard),在后续升级过程中,通过dashboard可以清楚地看到整个升级过程:

注:gke-local会在本地建立一个模拟load balancer,并将发到hello.com主机的请求按Traffic Fraction分发给不同副本。

接下来,我们就来开发httpserver的v2版本,将main.go中的version改为v2,然后重新编译httpserver,执行下面命令部署新版httpserver:

$weaver gke-local deploy weaver.toml
Deploying the application... Done
Version "2ee38e73-323f-4b42-b115-ee5bc40a8c09" of app "httpserver" started successfully.
Note that stopping this binary will not affect the app in any way.
Tailing the logs...
S1004 06:50:12.575585 stdout               702058ba                      │ http v2 listener available on http://localhost:8000
S1004 06:50:12.586352 stdout               ef3d7c3f                      │ http v2 listener available on http://localhost:8000

^CTo continue watching the logs, run the following command:

    weaver gke-local logs --follow 'version == "2ee38e73"'

由于我们配置的rollout为5分钟,所以新版httpserver替换掉旧版httpserver的过程会持续5分钟。而这个过程中load balancer针对新旧两个版本的Traffic Fraction也会动态调整:旧版本会逐渐降低,新版本会逐渐升高:



这时向app发送的请求,既可能由v1版本处理,也可能由v2版本处理:

$curl  --header 'Host: hello.com' "http://localhost:8000/convert?name=abcdefg"
after converting-v1, name is ABCDEFG
$curl  --header 'Host: hello.com' "http://localhost:8000/reverse?name=abcdefg"
after reversing-v1, name is gfedcba
$curl  --header 'Host: hello.com' "http://localhost:8000/convert?name=abcdefg"
after converting-v1, name is ABCDEFG
$curl  --header 'Host: hello.com' "http://localhost:8000/reverse?name=abcdefg"
after reversing-v2, name is gfedcba

最后新版app将全面接手对请求的处理:

之后,旧版的app将被delete掉:

这样新版app的升级部署(rollout)就结束了!rollout后,所有请求将被v2版本处理,应答中将带有v2字样。

在新版本升级的过程中,你如果使用ps查看httpserver进程数量,你会发现数量多出一倍,那是因为整个rollout过程采用的是蓝绿部署方式,即完全部署一套新app,然后通过调整load balancer的分发比例,让新版app逐渐承担全部流量,而在这个过程中,不会存在新老版本组件交互的情况出现。下图展示了这一过程:

注:如果要杀掉app,可以用weaver gke-local kill httpserver命令。

6. 小结

Service Weaver是一个优秀的框架,可以帮助开发人员以单体形式快速构建、以微服务形式快速部署分布式应用,其三个核心原则的创新思路值得我们学习借鉴。

但Service Weaver也不是万能的,Service Weaver主要针对在线的分布式服务系统,即需要在用户请求到达时处理它们的系统,例如网络应用程序或API Server正是此类分布式服务系统。基于Weaver开发这类系统,应用可以轻松获取网络Listener并建立HTTP 服务器,应用可以支持原子升级,且应用组件的副本数量可以根据请求压力的大小自动扩缩(本文并未演示这个特性)。

不过要注意的是:Service Weaver仅仅开源了几个月,其API尚未Stable,本文中的示例基于v0.21.2版本实现,也许在未来的某个时间点,这些示例可能会因API的变化而无法Run起来, status命令和dashboard命令所展现给用户的样式也会发生变化。另外学习weaver本身也是有学习成本的,weaver自身的代码由于采用了泛型和反射,读起来也是很晦涩。

综上,Service Weaver所践行的理念的优秀的,但考虑其成熟度以及Go社区崇尚的“The Best Go framework is no framework”的信条,选择引入Service Weaver框架之前务必要仔细斟酌。

本文涉及的Go源码,可以在这里下载。

7. 参考资料


“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) – https://gopherdaily.tonybai.com

我的联系方式:

微博(暂不可用):https://weibo.com/bigwhite20xx
微博2:https://weibo.com/u/6484441286
博客:tonybai.com
github: https://github.com/bigwhite
Gopher Daily归档 – https://github.com/bigwhite/gopherdaily

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

使用Go和WebRTC data channel实现端到端实时通信

本文永久链接 – https://tonybai.com/2023/09/23/p2p-rtc-implementation-with-go-and-webrtc-data-channel

关于实时通信(RTC,Real-Time Communication),我和大多数人一样,是用的多(比如网络电话、音视频会议等),但对RTC概念和其底层技术原理了解的却不多。近期,一项目恰用到了RTC技术,我就顺便翻阅了一些资料,并用Go建立了一个端到端数据通信的小demo,这里给大家分享一下。

1. RTC与WebRTC

1.1 实时通信(Real-Time Communication)

实时通信(RTC)是实时发生的任何在线通信。生活中,最常见的采用实时通信方式的例子就是电话:一旦双方接通后,数据便直接从发送方即时发送到接收方,不会存储在前往目的地的途中

而传统的邮件以及互联网电子邮件则并非实时通信,因为在邮件/电邮的场景下,我们发送数据后,对方通常要等待一段时间才能收到数据,同时我们也需要等待一段时间才能收到回复。相信这个反例可以更好地帮助大家理解实时通信的特点。

总结一下,实时通信具有以下特点(想象一下打电话的过程):

  • 存在接通的过程
  • 点对点(通常没有中间存储或处理节点)
  • 传输低延迟

1.2 WebRTC技术的诞生

显然RTC技术是一种能给人们生活带来极大便捷的技术,尤其是在音视频实时传输方面,但很长时间以来,实时通信技术都十分复杂,还有专利门槛,将实时通信技术与业务结合既非常困难,又十分耗时,并且即便大力投入也未必能取得很好的效果,通常只有大厂才有这个能力实现稍完善的RTC方案和产品。

此外,随着Web技术的兴起、移动互联网时代的到来、4G/5G和宽带技术的蓬勃发展,人们都迫切希望将实时通信技术与Web等技术融合在一起,通过浏览器或智能终端即可快速建立音视频的实时数据通信。

于是2009年谷歌出手了!

  • 2009年,谷歌提出了创建WebRTC的概念,作为Adobe Flash以及无法在浏览器中运行的桌面应用程序的替代方案。
  • 2010年,谷歌收购了大量提供RTC技术授权的公司。
  • 2011年,谷歌开源了WebRTC项目
  • 2011年末,W3C发布第一个WebRTC规范草案。
  • 2013年,谷歌和Mozilla展示了基于WebRTC的异构浏览器之间的视频通话。
  • 2017年,WebRTC进入候选推荐标准(Candidate Recommendation,CR)阶段。
  • 2021年初,WebRTC成为W3C正式推荐标准及IETF标准

如今,WebRTC已经广泛用在了在线教育、电商直播、泛娱乐社交等应用领域。

1.3 WebRTC简明介绍

WebRTC(Web Real-Time Communication)是一套开源的点对点实时通信技术,最初为Web打造,旨在让Web应用可以直接在浏览器中进行实时的音视频通信和数据交换,而无需安装第三方插件。WebRTC具体体现为一组开源协议、引擎和API。

下面是W3C出品的WebRTC的技术栈的架构图(来自https://webrtc.github.io/webrtc-org/architecture/):

我们看到WebRTC还是蛮复杂的,涉及到多类API、会话/信令管理、音频编解码算法引擎、视频编解码算法引擎、包含多种协议的传输层以及底层音视频捕捉和渲染等。全面掌握WebRTC全技术栈是很困难的,好在上面的架构图将不同领域的开发者的关注点做了标记,大多数开发者关注WebRTC API和Web API即可。并且,随着WebRTC自身的演进,目前WebRTC已经不局限于浏览器,可以应用于其他各种应用程序。在Go社区,最知名的WebRTC类项目莫过于pion了,它提供了纯Go的WebRTC API实现,任何Go应用都可以使用pion的WebRTC API开发点对点实时通信应用。

1.4 WebRTC相关的协议

WebRTC并没有全部另立炉灶从头建立很多新协议,而是复用了很多成熟的网络协议和应用协议,尤其是涉及数据传输的协议。下图是WebRTC中使用的一些重要协议分布图:


图改自《WebRTC技术详解》一书

很多协议大家都非常熟悉,比如HTTP、WebSocket、TLS、TCP、UDP等,但也有些协议是大家比较陌生的,如RTP/SRTP、SCTP等,针对这些陌生协议,我们下面简要介绍一下:

1.4.1 RTP(Real-time Transport Protocol,实时传输协议)和SRTP(Secure RTP)

RTP协议支持通过IP网络实时传输音频和视频。RTP常用于流媒体服务的通信系统,例如网络电话、视频电话会议等系统。RTP也是WebRTC使用的最重要的协议之一,在WebRTC中,RTP用于在WebRTC客户端(比如浏览器)之间传输音频和视频媒体(media)数据包。

RTP是专为流媒体的端到端实时传输设计的,更关注信息的实时性,可以避免出现因网络传输丢失数据造成通话质量下降的情况。并且,如上图所示,RTP都是基于UDP构建的,并额外提供抖动补偿、包丢失检测和无序传递检测的功能。

此外,RTP在传递媒体流时会为每个媒体流建立一个会话,即音频和视频流各自使用单独的RTP会话,这样接收端就能有选择性地接收媒体流(音频、视频或音视频)。

基础的RTP没有内置任何安全机制,因此不能保证传输数据的安全性,这样端与端之间通信传输未加密的数据时,都有可能被第三方拦截并窃取。为此,WebRTC规范明确禁止使用未加密的RTP,而是使用安全增强后的SRTP(Secure RTP)。SRTP可以为单播和多播应用程序中的RTP数据提供加密、消息身份验证和完整性以及重放攻击保护等安全功能。

注:对于非音频或视频数据,WebRTC不使用RTP,而是在通信的两端建立一个data channel用于交换任意格式的数据。

1.4.2 SCTP(Stream Control Transmission Protocol,SCTP)

WebRTC的端与端建立连接后,音视频数据的交互由RTP/SRTP协议完成,但非音视频数据,则由两端之间建立的数据通道(data channel)完成。数据通道支持传输字符串、文件、图片等数据。

数据通道API的使用方式与WebSocket非常相似,但是WebSocket运行于TCP之上,而WebRTC数据通道的底层传输使用了DTLS/UDP,具有较高的安全性,上层则是使用SCTP,默认使用可靠且有序的方式进行数据传输。

SCTP是在2000年由IETF的SIGTRAN工作组定义的一个传输层协议。它是面向连接、端到端、全双工、带有流量和拥塞控制的可靠传输协议,本来与TCP和UDP处于同一级别,可以直接运行在IP之上。只是在WebRTC中,它被用在了应用层。

WebRTC充分利用了SCTP的面向消息(非tcp那样的面向流)的、带有拥塞控制算法的可靠传输机制,同时SCTP支持在一个传输通道中关联多个流的特性,这样每个流可以单独处理,甚至可以具有不同的可靠性属性。流与流之间不存在线头阻塞问题。流由流编号标识,可以在一定程度上提供多路复用功能,而无需开多个SCTP连接。

1.4.3 SDP(Session Description Protocol, 会话描述协议)

SDP是一种文本形式的会话描述协议,用于描述多媒体会话的参数。

SDP是WebRTC端与端建立连接过程中必须要使用的协议。WebRTC使用SDP来描述对等连接的两端的媒体特征,包括会话属性、会话活动的时间、会话包含的媒体信息、媒体编/解码器、媒体地址和端口信息以及网络带宽的信息等。

下面是SDP协议内容的一个典型例子(来自https://developer.mozilla.org/en-US/docs/Glossary/SDP):

v=0
o=alice 2890844526 2890844526 IN IP4 host.anywhere.com
s=
c=IN IP4 host.anywhere.com
t=0 0
m=audio 49170 RTP/AVP 0
a=rtpmap:0 PCMU/8000
m=video 51372 RTP/AVP 31
a=rtpmap:31 H261/90000
m=video 53000 RTP/AVP 32
a=rtpmap:32 MPV/90000

WebRTC的两个端在使用RTP/SRTP传输音视频数据或使用SCTP传输data channel数据之前,需要先建立连接。建立连接的过程类似于传统电话从拨号、呼叫等待、到接通的过程。这个过程通常会有一个叫信令服务器(signaling server)的中间角色(好比文首配图的人工电话交换机)参与。而SDP在建连过程中起着重要作用,信令服务器会将两端的SDP转发给另一方,直到两端都拥有了自己和对方的会话描述信息(SDP承载),并在媒体交换格式方面达成了一致,这是两端连接成功的前提。

注:SDP不是WebRTC专属的,SDP在很多领域有广泛应用,最常见的就是即时通信(IM)领域。

1.4.4 STUN、TURN和ICE

使用WebRTC进行实时通信的两端通常都位于防火墙或NAT之后的“内网”,只有很少部分主机能够拥有独立的公网IP而直接接入Internet。也就是说,尝试建立连接的双方由于位于NAT网络之中,不能直接使用内网IP地址建立网络连接。WebRTC于是使用“NAT穿透技术(俗称打洞)”来帮助两端建立连接。

STUN就是一种最常见的NAT穿透协议,其全称是“Simple Traversal of UDP Through NATs”,即简单的用UDP穿透NAT。STUN本质上是一种公网地址及端口的发现协议,客户端向STUN服务器发送请求,STUN服务器返回客户端的公网地址及NAT网络信息。这些信息用于构建在ICE打洞时的候选地址,并由信令服务器转发给另一端。

不过STUN无法应对所有NAT网路情形,在对称NAT(映射的外网地址端口号不固定,会随着目的地址的变化而变化)情况下,WebRTC用户无法使用STUN协议建立P2P连接,这种情况就需要借助TURN协议提供的服务进行流量中转。

TURN(Traversal Using Relays around NAT)是一种通过数据转发的方式穿透NAT的,解决了防火墙和对称NAT的问题。TURN支持UDP和TCP协议。

注:使用STUN建立的是P2P的网络模型,网络连接直接建立在通信两端,没有中间服务器介入;而使用TURN建立的是流量中继的网络模型,用户两端都与TURN服务建立连接,用户的网络数据包通过TURN服务进行转发 — 《WebRTC技术详解》

我们看到,TURN与STUN的共同点都是通过修改应用层中的私网地址达到NAT穿透的效果,不同点是TURN是通过两方通讯的“中间人”方式实现穿透。但TURN与其他中继控制协议也有不同,它能够允许一个客户端使用一个中继地址与多个对端连接。

ICE(Interactive Connectivity Establishment, 交互式连接建立)跟STUN和TURN不一样,ICE不是一种协议,而是一个框架(Framework),它整合了STUN和TURN,并利用STUN和TURN服务器来帮助两端建立起连接。

WebRTC的一端通过ICE获得的每个网络信息都会被包装成一个ICE候选者(candidate)。ICE候选者描述了用于建立网络连接的网络信息,包含网络协议、IP地址、端口等。如果设备上有多个IP地址,那么每个IP地址都会对应一个候选。例如设备A上有内网IP地址IP-1,还有公网IP地址IP-2,A通过IP-1可以直接与B进行通信,但是WebRTC不会判断优先使用哪个IP地址,而是同样从两个IP地址收集候选,并将候选信息通过信令服务器转发给另一端。

ICE候选者有多种类型(以基于UDP传输为例),包括host(本机候选)、srflx(映射候选)、relay(中继候选)和prflx(来自对称NAT的映射候选)。类型有优先级次序,其中host优先级最高,relay优先级最低。比如WebRTC收集到了两个候选者,一个是host类型,另一个是srflx类型,那么WebRTC一定会先尝试与host类型的Candidate建立连接,如果不成功,才会使用srflx类型的Candidate。

当两端都得到自己和对方的ICE候选信息后,就会进行ICE候选配对,并最终选出一个用于建立端与端连接的ICE候选者对(pair),最终两端将基于这个候选者对中的网络信息建立了P2P的连接。

有了上面协议这层铺垫后,接下来我们再来看WebRTC建立连接的流程就容易多了。

1.5 WebRTC的建连流程

下面是WebRTC的典型建连流程图:

如图所示,WebRTC端到端建立连接的第一步是与信令服务器建立连接并交换SDP信息。

信令服务器通常位于两端都能访问到的公网。当WebRTC一端启动后,它可能不知道要与谁通信,或者仅知道对方的极少的信息(比如一个ID),信令服务器可以帮助参与通信的两端解决这个问题。就像前面说的,你可以将信令服务器看作是电话人工交换机及其操作员,它可以帮助参与通信的两端找到彼此。WebRTC并未将信令服务器以及信令协议标准化,因为信令服务器是“业务相关”的,究竟是建立一对一连接,还是建立群聊,这些由信令服务器的业务来决定。承载信令的协议可以是普通的HTTP,也可以是WebSocket,亦可是像XMPP那样的专用信令协议。

在WebRTC中,主动发起连接的一方会创建offer,并通过信令服务器将offer转发给另一方;另一方收到offer后会创建answer,并同样通过信令服务器转发给发起方。无论是offer,还是answer,都包含了各自的SDP信息。

第二步,当交换SDP后,两端各自发起ICE过程,向STUN/TURN服务器发起请求,获取各自NAT后的公网信息,并形成ICE候选者。

第三步,双方通过信令服务器交换ICE候选者信息

当ICE候选者配对成功后,就来到了第四步,WebRTC两端直接建立连接。连接建立成功后,便可以进行数据传输交换了。

2. WebRTC data channel

上面提到过,WebRTC除了提供了音视频媒体实时通信能力外,还支持可以传输非媒体流数据的数据通道(data channel)

和音视频数据一样,经由WebRTC数据通道进行的数据交换不经过服务器,不受服务器性能及带宽瓶颈的限制,同时减少了数据被拦截的概率。数据通道底层传输使用了DTLS,具有较高的安全性。上层使用SCTP,默认使用可靠且有序的方式进行数据传输。此外,data channel的建连过程与音视频的建连过程也是一致的。

下面我们就来用一个实际的例子展示一下如何使用Go建立基于WebRTC data channel的端到端实时通信。

3. 基于Go和Pion的WebRTC data channel应用示例

通过前面的介绍,我们知道了WebRTC技术栈十分复杂,日常WebRTC应用开发时,我们一般会基于开源的实现进行开发。Go语言在WebRTC开发领域也有比较成熟的开源项目,如Pion。Pion提供了纯Go实现的WebRTC API实现以及WebRTC相关组件实现,使用Pion可以帮助我们快速高效开发WebRTC服务器和客户端应用。

3.1 pion: 纯Go的WebRTC实现

根据pion之父的说法,pion的诞生源于用WebRTC构建东西的挫败感,这种挫败感来源于Google开源的首个webrtc实现libwebrtc,因为将libwebrtc构建和运行起来似乎十分困难。

pion就是根据libwebrtc的教训而设计的,pion给开发者的第一印象就是它十分容易构建和运行起来。这一定程度要归功于pion是用Go编写的,更模块化,也更透明,并且pion之父最初便考虑了将其用在Chromium之外的应用中。

pion是一个纯粹的WebRTC软件的Go集合, 涵盖了WebRTC项目中需要的所有主要元素:

同时,pion项目还为WebRTC开发者贡献了一本非常好的WebRTC资料《WebRTC For The Curious》,很值得一读。另外,pion项目的examples也十分丰富,非常利于初学者快速掌握WebRTC以及如何使用pion开发WebRTC应用。

下面我们就基于pion的webrtc实现项目开发一个基于data channel的端到端实时通信示例。

根据之前对WebRTC建立过程的说明,我们首先需要设计一下这个示例的信令服务器以及信令协议。

3.2 信令服务与协议设计

信令服务器在WebRTC通信中扮演协调者的角色。它传递客户端的媒体参数和连接候选信息。

我们的业务模型是,信令服务器维护一个被动连接的peer集合,这个集合中的peer是在这些peer在启动时通过register信令注册到信令服务器中的,每个peer有一个唯一的ID,我称这个集合为answer peer集合吧。主动连接方(这里称为offer peer)则通过ID去连接answer peer。一旦建立与某个peer的连接后,它们便可以通过建立的data channel全双工的实时通信了。下面是信令服务与offer peer和answer peer的信令交互图:

参照前面提到的WebRTC建连过程,你可以很容易的看懂这个协议设计。

这里我设计了一个Message抽象来表示信令服务可以收发的消息:

//webrtc-data-channel/signaling/proto/proto.go

type Message struct {
    Cmd     int    `json:"command"`
    Payload []byte `json:"payload"` // carry all kinds of request and response
}

其中的Cmd字段标识Message类型,可选值如下:

//webrtc-data-channel/signaling/proto/proto.go

const (
    // originated from answer peer
    CmdInit = iota + 1
    CmdAnswer

    // originated from answer peer
    CmdOffer

    // from both peer
    CmdCandidate
)

const (
    CmdInitResp = iota + 101 // CmdInit + 100
    CmdAnswerResp
    CmdOfferResp
    CmdCandidateResp
)

Message既可以承载Request,亦可以承载Response。Message的Payload字段中存放的是Request或Response序列化后的结果。Request和Response结构如下:

//webrtc-data-channel/signaling/proto/proto.go

// Request is one kind of payload for Message
type Request struct {
    SourceID string `json:"source"`
    TargetID string `json:"target"`
    Body     []byte `json:"body"` // carry register, offer, answer, candidate
}

// Request is another payload for Message
type Response struct {
    Code int    `json:"code"`
    Msg  string `json:"msg"`
}

Request类型的Body中存放的是WebRTC Offer/Answer的SDP以及ICE Candidate序列化后的结果。

此外,在这个示例中,我们使用WebSocket来作为信令协议的载体,便于信令服务器与offer peer/answer peer进行双向通信。

3.3 信令服务器的实现

按照上述设计,我们的信令服务器就是一个websocket的server:

//webrtc-data-channel/signaling/main.go

func main() {
    flag.Parse()
    log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
    http.HandleFunc("/register", register) // for peerAnswer
    http.HandleFunc("/offer", offer)       // for peerOffer
    log.Fatal(http.ListenAndServe(*addr, nil))
}

在这个server中我们提供了两个endpoint,一个是/register,供answer peer建立连接使用;另外一个是/offer,供offer peer与信令服务器建连并通信的。

两个endpoint对应的Handler的处理模式也相对一致,都是进入一个event loop中。

//webrtc-data-channel/signaling/main.go

func register(w http.ResponseWriter, r *http.Request) {
    c, err := upgrader.Upgrade(w, r, nil) // *websocket.Conn
    if err != nil {
        log.Print("signaling: websocket upgrade error:", err)
        return
    }
    defer c.Close()

    err = answerPeerEventLoop(c, w)
    if err != nil {
        log.Println("signaling: answerPeerEventLoop error:", err)
        return
    }
    log.Println("signaling: answerPeerEventLoop exit")
}

func offer(w http.ResponseWriter, r *http.Request) {
    c, err := upgrader.Upgrade(w, r, nil) // *websocket.Conn
    if err != nil {
        log.Print("signaling: websocket upgrade error:", err)
        return
    }
    defer c.Close()

    err = offerPeerEventLoop(c, w)
    if err != nil {
        log.Println("signaling: offerPeerEventLoop error:", err)
        return
    }
    log.Println("signaling: offerPeerEventLoop exit")
}

注:offer和register这两个Handler都会在单独的goroutine中执行。

offerPeerEventLoop和answerPeerEventLoop的代码都比较长,这里就不贴出来了。这两个函数的代码也都比较模式化,基本处理流程就是读取一个Message,判断Message的Cmd类型,然后根据Cmd类型分别处理,处理的逻辑参见上面信令服务器的信令处理流程:基本上就是转发、转发、转发。

3.4. answer peer的实现

answer peer启动后会建立RTCPeerConnection类型实例,并设置RTCPeerConnection实例的事件处理函数:

  • OnICECandidate

本地收集到ICE候选者信息,处理动作是将这些ICE候选者信息通过信令服务转发到对端。

  • OnConnectionStateChange

当与对端的连接状态发生变化时触发,比如连接建立、连接断开时。处理动作仅为输出相应的日志。

  • OnDataChannel

当与对端的Data Channel创建成功时,处理逻辑是注册DataChannel.OnOpen和DataChannel.OnMessage两个事件处理函数。

完成这些后,answer peer会向上面设计的那样,与信令服务器建立连接,并发送请求到信令服务的/register端点,然后进入event loop。在event loop中负责处理信令服务器转发过来的Offer、Candidate等信息,以及各种信令服务器返回的Response。

当收到Offer时,answer peer会创建Answer并发给信令服务器;当收到Candidate时,会调用AddICECandidate将Candidate信息添加到peerConnection中,供后续配对使用。后续WebRTC连接自动建立后,便可以通过data channel收发数据了。

answer peer的代码较长,大家可以自行到https://github.com/bigwhite/experiments/tree/master/webrtc-data-channel/answer阅读。

注:answer peer的代码改编自pion/webrtc项目pion-to-pion/answer示例

3.5. offer peer的实现

offer peer的实现与answer相似。

offer peer启动后会建立RTCPeerConnection类型实例,并设置RTCPeerConnection实例的事件处理函数:

  • OnICECandidate
  • OnConnectionStateChange
  • DataChannel的OnOpen
  • DataChannel的OnMessage

offer peer会主动创建DataChannel,然后与信令服务器建立连接,并发送请求到信令服务的/offer端点并主动向信令服务器发送Offer,最后进入event loop。在event loop中负责处理信令服务器转发过来的Answer、Candidate等信息,以及各种信令服务器返回的Response。

当收到Answer时,offer peer会将Answer中携带的SDP传给SetRemoteDescription,同时调用SetLocalDescription开启ICE候选者的收集过程;当收到Candidate时,会调用AddICECandidate将Candidate信息添加到peerConnection中,供后续配对使用。后续WebRTC连接自动建立后,便可以通过data channel收发数据了。

offer peer的代码较长,大家可以自行到https://github.com/bigwhite/experiments/tree/master/webrtc-data-channel/offer阅读。

注:offer peer的代码改编自pion/webrtc项目pion-to-pion/offer示例

3.6 运行示例

下面我们来运行一下这个示例。

先来启动信令服务器:

$cd webrtc-data-channel/signaling
$go run main.go

启动answer peer:

$cd webrtc-data-channel/answer
$go run main.go
2023/09/23 21:24:45.201213 answer: NewPeerConnection ok
2023/09/23 21:24:45.201256 answer: connecting to ws://localhost:18080/register
2023/09/23 21:24:45.203993 answer: recv resp[101]: proto.Response{Code:0, Msg:"ok"}

这时我们会从信令服务器的输出日志中看到:

2023/09/23 21:24:45.203702 signaling: add answer peer: answer-peer-1

我们看到,answer peer成功注册到信令服务器中了,其ID为answer-peer-1。

下面我们来启动offer peer,其要连接的target为answer-peer-1:

$cd webrtc-data-channel/offer
$go run main.go -target answer-peer-1
2023/09/23 21:25:26.462845 offer: new peerConnection ok
2023/09/23 21:25:26.462880 offer: create new channel
2023/09/23 21:25:26.462890 offer: connecting to ws://localhost:18080/offer
2023/09/23 21:25:26.464863 offer: create offer
2023/09/23 21:25:26.465131 offer: recv resp[103]: proto.Response{Code:0, Msg:"ok"}
2023/09/23 21:25:26.465957 offer: recv answer(sdp) message from answer-peer-1
2023/09/23 21:25:26.466064 offer: set local desc
2023/09/23 21:25:26.466099 offer: set remote desc
2023/09/23 21:25:26.466201 offer: Peer Connection State has changed: connecting
2023/09/23 21:25:26.466297 offer: recv candidate message from answer-peer-1
2023/09/23 21:25:26.466344 offer: invoke peerConnection.OnICECandidate: webrtc.ICECandidate{statsID:"candidate:KsXlIk2JNeiDqK3l+znsoB3sDwuh1/2x", Foundation:"4104056053", Priority:0x7effffff, Address:"192.168.1.105", Protocol:1, Port:0xc2b1, Typ:1, Component:0x1, RelatedAddress:"", RelatedPort:0x0, TCPType:""}
2023/09/23 21:25:26.466506 offer: recv resp[104]: proto.Response{Code:0, Msg:"ok"}
2023/09/23 21:25:26.468342 offer: Peer Connection State has changed: connected
2023/09/23 21:25:26.469105 offer: Data channel 'data'-'824634439080' open. Random messages will now be sent to any connected DataChannels every 5 seconds
2023/09/23 21:25:26.859774 offer: recv candidate message from answer-peer-1
2023/09/23 21:25:31.469811 offer: Sending 'offer-1013426535'
2023/09/23 21:25:31.470846 offer: Message from DataChannel 'data': 'answer-695102175'
2023/09/23 21:25:36.469653 offer: Sending 'offer-2065047193'
2023/09/23 21:25:36.470495 offer: Message from DataChannel 'data': 'answer-750781464'
2023/09/23 21:25:41.469603 offer: Sending 'offer-153497802'
2023/09/23 21:25:41.469938 offer: Message from DataChannel 'data': 'answer-2102723687'
2023/09/23 21:25:46.469504 offer: Sending 'offer-1287609150'
2023/09/23 21:25:46.470097 offer: Message from DataChannel 'data': 'answer-645051512'
2023/09/23 21:25:51.470078 offer: Sending 'offer-1486812657'
2023/09/23 21:25:51.470572 offer: Message from DataChannel 'data': 'answer-1325372035'

offer peer的启动引发了“连锁反应”,在信令服务器的帮助下,offer peer与answer peer成功建立了连接,并在打开的Data Channel进行着“定时”的双工实时通信。

信令服务器的输出日志如下:

2023/09/23 21:25:26.465049 signaling: recv request[3] from offer peer
2023/09/23 21:25:26.465070 signaling: send offer resp ok
2023/09/23 21:25:26.465073 signaling: add offer peer:  offer-peer-1
2023/09/23 21:25:26.465085 signaling: forward request[3] to answer peer ok
2023/09/23 21:25:26.465247 signaling: recv offer response from answer peer
2023/09/23 21:25:26.465868 signaling: recv request[2] from answer peer
2023/09/23 21:25:26.465896 signaling: forward request[2] to offer peer[offer-peer-1] ok
2023/09/23 21:25:26.466003 signaling: recv answer response from offer peer
2023/09/23 21:25:26.466218 signaling: recv request[4] from answer peer
2023/09/23 21:25:26.466245 signaling: forward request[4] to offer peer[offer-peer-1] ok
2023/09/23 21:25:26.466363 signaling: recv candidate response from offer peer
2023/09/23 21:25:26.466415 signaling: recv request[4] from offer peer
2023/09/23 21:25:26.466429 signaling: send offer resp ok
2023/09/23 21:25:26.466435 signaling: add offer peer:  offer-peer-1
2023/09/23 21:25:26.466445 signaling: forward request[4] to answer peer ok
2023/09/23 21:25:26.466526 signaling: recv candidate response from answer peer
2023/09/23 21:25:26.859520 signaling: recv request[4] from answer peer
2023/09/23 21:25:26.859609 signaling: forward request[4] to offer peer[offer-peer-1] ok
2023/09/23 21:25:26.859951 signaling: recv candidate response from offer peer

answer peer的输出日志如下:

2023/09/23 21:25:26.465182 answer: recv offer message from offer-peer-1
2023/09/23 21:25:26.465823 answer: send sdp answer
2023/09/23 21:25:26.465834 answer: Peer Connection State has changed: connecting
2023/09/23 21:25:26.465925 answer: set local desc
2023/09/23 21:25:26.465928 answer: recv resp[102]: proto.Response{Code:0, Msg:"ok"}
2023/09/23 21:25:26.466108 answer: invoke peerConnection.OnICECandidate: 192.168.1.105
2023/09/23 21:25:26.466285 answer: recv resp[104]: proto.Response{Code:0, Msg:"ok"}
2023/09/23 21:25:26.466481 answer: recv candidate message from offer-peer-1
2023/09/23 21:25:26.468475 answer: Peer Connection State has changed: connected
2023/09/23 21:25:26.469002 answer: New DataChannel data 824634440046
2023/09/23 21:25:26.469049 answer: Data channel 'data'-'824634440046' open. Random messages will now be sent to any connected DataChannels every 5 seconds
2023/09/23 21:25:26.859199 answer: invoke peerConnection.OnICECandidate: 175.160.224.151
2023/09/23 21:25:26.859770 answer: recv resp[104]: proto.Response{Code:0, Msg:"ok"}
2023/09/23 21:25:31.470331 answer: Sending 'answer-695102175'
2023/09/23 21:25:31.470366 answer: message from DataChannel 'data': 'offer-1013426535'
2023/09/23 21:25:36.470028 answer: Sending 'answer-750781464'
2023/09/23 21:25:36.470123 answer: message from DataChannel 'data': 'offer-2065047193'
2023/09/23 21:25:41.469624 answer: Sending 'answer-2102723687'
2023/09/23 21:25:41.469978 answer: message from DataChannel 'data': 'offer-153497802'
2023/09/23 21:25:46.469606 answer: Sending 'answer-645051512'
2023/09/23 21:25:46.469883 answer: message from DataChannel 'data': 'offer-1287609150'
2023/09/23 21:25:51.470303 answer: Sending 'answer-1325372035'
2023/09/23 21:25:51.470421 answer: message from DataChannel 'data': 'offer-1486812657'

这次运行是在本地同一主机下运行的。你也可以将信令服务器搭建在公网主机上,然后将answer peer和offer peer分别放到不同的公有云虚机上,你看看是否依然可以连通!我在阿里云上的测试结果是ok的(信令服务器放在美国)。

注:示例中使用的stun server:74.125.137.127:19302实际上就是stun.l.google.com:19302。

4. 小结

通过本文的讲解和示例,我们看到:基于WebRTC数据通道可以实现低延迟的P2P实时通信。Go语言通过Pion等项目库提供了对开发WebRTC的支持。通过信令服务器协调Offer/Answer模型,可以建立起端到端的数据通道。未来WebRTC数据通道可用于更多像实时协同、远程控制等应用场景。

本文代码示例可在这里下载。

注:本文示例仅是用作展示如何使用Go进行WebRTC应用的开发,对异常处理等方面并未做太多考虑,不要将示例代码用作生产环境。另外gorilla的websocket.Conn并非始终是goroutine safe的,示例中代码对websocket.Conn的保护并不那么充分。

5. 参考资料


“Gopher部落”知识星球旨在打造一个精品Go学习和进阶社群!高品质首发Go技术文章,“三天”首发阅读权,每年两期Go语言发展现状分析,每天提前1小时阅读到新鲜的Gopher日报,网课、技术专栏、图书内容前瞻,六小时内必答保证等满足你关于Go语言生态的所有需求!2023年,Gopher部落将进一步聚焦于如何编写雅、地道、可读、可测试的Go代码,关注代码质量并深入理解Go核心技术,并继续加强与星友的互动。欢迎大家加入!

img{512x368}
img{512x368}

img{512x368}
img{512x368}

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) – https://gopherdaily.tonybai.com

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx
  • 微博2:https://weibo.com/u/6484441286
  • 博客:tonybai.com
  • github: https://github.com/bigwhite
  • Gopher Daily归档 – https://github.com/bigwhite/gopherdaily

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

如发现本站页面被黑,比如:挂载广告、挖矿等恶意代码,请朋友们及时联系我。十分感谢! Go语言第一课 Go语言精进之路1 Go语言精进之路2 商务合作请联系bigwhite.cn AT aliyun.com

欢迎使用邮件订阅我的博客

输入邮箱订阅本站,只要有新文章发布,就会第一时间发送邮件通知你哦!

这里是 Tony Bai的个人Blog,欢迎访问、订阅和留言! 订阅Feed请点击上面图片

如果您觉得这里的文章对您有帮助,请扫描上方二维码进行捐赠 ,加油后的Tony Bai将会为您呈现更多精彩的文章,谢谢!

如果您希望通过微信捐赠,请用微信客户端扫描下方赞赏码:

如果您希望通过比特币或以太币捐赠,可以扫描下方二维码:

比特币:

以太币:

如果您喜欢通过微信浏览本站内容,可以扫描下方二维码,订阅本站官方微信订阅号“iamtonybai”;点击二维码,可直达本人官方微博主页^_^:
本站Powered by Digital Ocean VPS。
选择Digital Ocean VPS主机,即可获得10美元现金充值,可 免费使用两个月哟! 著名主机提供商Linode 10$优惠码:linode10,在 这里注册即可免费获 得。阿里云推荐码: 1WFZ0V立享9折!


View Tony Bai's profile on LinkedIn
DigitalOcean Referral Badge

文章

评论

  • 正在加载...

分类

标签

归档



View My Stats