所有文章

DOCKER源码-创建容器

Daemon结构体

daemon/daemon.go

type Daemon struct {
    // ... 省略多行 ...
    containerdCli         *containerd.Client
    containerd            libcontainerdtypes.Client
    volumes           *volumesservice.VolumesService
    // ... 省略多行 ...
}

client结构体

Daemon中的containerd字段负责容器相关操作,Daemon对象所有对容器的操作都是通过调用containerd对象相应函数来完成的,它是一个接口,相应的实现定义在libcontainerd/remote/client.go文件中,它具有以下函数(只列出部分):

func (c *client) Version(ctx context.Context) (containerd.Version, error)

func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error)

func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error

func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error)

func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error)

Client结构体

client结构体是一个较高层的封装,它在实例化的时候依赖Client结构体,而Client结构体实际是一个RPC客户端,所有容器操作都是通过RPC调用containerd进程,关键代码:
vendor/github.com/containerd/containerd/api/services/containers/v1/containers.pb.go

func (c *containersClient) Create(ctx context.Context, in *CreateContainerRequest, 
    opts ...grpc.CallOption) (*CreateContainerResponse, error) {
    out := new(CreateContainerResponse)
    err := c.cc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Create", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

containerd程序的启动

下面进入containerd项目,看一下containerd是怎样处理RPC请求的,先简单说一下containerd的启动流程:
main()函数定义在cmd/containerd/main.go

func main() {
    app := command.App()
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
        os.Exit(1)
    }
}

RPC Server接口定义

api/services/containers/v1/containers.pb.go

type ContainersServer interface {
    Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
    List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
    ListStream(*ListContainersRequest, Containers_ListStreamServer) error
    Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
    Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
    Delete(context.Context, *DeleteContainerRequest) (*types.Empty, error)
}

RPC路由定义

ContainersServer接口没有直接实现,而是通过路由方式将请求映射到相应函数,以下路由定义:

var _Containers_serviceDesc = grpc.ServiceDesc{
    // ... 省略多行 ...
        {
            MethodName: "Create",
            Handler:    _Containers_Create_Handler,
        },
    // ... 省略多行 ...
}

RPC Server接口实现

可以看到,负责创建容器的函数是_Containers_Create_Handler()
api/services/containers/v1/containers.pb.go

func _Containers_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(CreateContainerRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(ContainersServer).Create(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/containerd.services.containers.v1.Containers/Create",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(ContainersServer).Create(ctx, req.(*CreateContainerRequest))
    }
    return interceptor(ctx, in, info, handler)
}

路由注册

由上面函数可以知道,创建容器的动作其实是调用srv对象的Create()函数,那srv对象是谁传进来的呢?我们看一下上面路由时怎样注册的,上面的_Containers_serviceDesc变量在注册RPC Server的时候被使用:

func RegisterContainersServer(s *grpc.Server, srv ContainersServer) {
    s.RegisterService(&_Containers_serviceDesc, srv)
}

RegisterContainersServer()函数又是由server调用的:
services/containers/service.go

func (s *service) Register(server *grpc.Server) error {
    api.RegisterContainersServer(server, s)
    return nil
}

RPC接口实现

由此可见,RPC Server中的Create()函数其实是调用service对象的Create()函数:
services/containers/service.go

func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
    return s.local.Create(ctx, req)
}

local结构体

而service对象的Create()函数是调用的local字段的Create()函数,local是一个插件,它是在local包被加载的时候注册的,local的Create()函数如下:
services/containers/local.go

func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
    var resp api.CreateContainerResponse
    if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
        container := containerFromProto(&req.Container)
        created, err := l.Store.Create(ctx, container)
        if err != nil {
            return err
        }
        resp.Container = containerToProto(&created)
        return nil
    }); err != nil {
        return &resp, errdefs.ToGRPC(err)
    }
    if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
        ID:    resp.Container.ID,
        Image: resp.Container.Image,
        Runtime: &eventstypes.ContainerCreate_Runtime{
            Name:    resp.Container.Runtime.Name,
            Options: resp.Container.Runtime.Options,
        },
    }); err != nil {
        return &resp, err
    }
    return &resp, nil
}

运行时插件

local调用了db字段的Create()函数,db是一个运行时插件,在1.2.13版本的containerd中包含两个版本的运行时:
runtime/v1/linux/runtime.go

func init() {
    plugin.Register(&plugin.Registration{
        Type:   plugin.RuntimePlugin,
        ID:     "linux",
        InitFn: New,
        Requires: []plugin.Type{
            plugin.MetadataPlugin,
        },
        Config: &Config{
            Shim:    defaultShim,
            Runtime: defaultRuntime,
        },
    })
}

runtime/v2/manager.go

func init() {
    plugin.Register(&plugin.Registration{
        Type: plugin.RuntimePluginV2,
        ID:   "task",
        Requires: []plugin.Type{
            plugin.MetadataPlugin,
        },
        Config: &Config{
            Platforms: defaultPlatforms(),
        },
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
            supportedPlatforms, err := parsePlatforms(ic.Config.(*Config).Platforms)
            if err != nil {
                return nil, err
            }
            ic.Meta.Platforms = supportedPlatforms
            if err := os.MkdirAll(ic.Root, 0711); err != nil {
                return nil, err
            }
            if err := os.MkdirAll(ic.State, 0711); err != nil {
                return nil, err
            }
            m, err := ic.Get(plugin.MetadataPlugin)
            if err != nil {
                return nil, err
            }
            cs := metadata.NewContainerStore(m.(*metadata.DB))
            return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
        },
    })
}

调用运行时插件

运行时插件包含两部分,shim和runc,shim来控制创建容器的整个流程,而设置容器的cgroup、namespace、启动容器中的进程等工作则调用runc来完成。
以v2版本为例,创建容器的函数如下:
runtime/v2/runc/v2/service.go

func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    container, err := runc.NewContainer(ctx, s.platform, r)
    if err != nil {
        return nil, err
    }
    s.containers[r.ID] = container
    s.send(&eventstypes.TaskCreate{
        ContainerID: r.ID,
        Bundle:      r.Bundle,
        Rootfs:      r.Rootfs,
        IO: &eventstypes.TaskIO{
            Stdin:    r.Stdin,
            Stdout:   r.Stdout,
            Stderr:   r.Stderr,
            Terminal: r.Terminal,
        },
        Checkpoint: r.Checkpoint,
        Pid:        uint32(container.Pid()),
    })
    return &taskAPI.CreateTaskResponse{
        Pid: uint32(container.Pid()),
    }, nil
}

创建containerd-shim进程

此时的调用链如下:
调用运行时的Create()函数:

runtime/v2/manager.go:Create()

构建containerd-shim命令对象,准备执行:

runtime/v2/binary.go:Start()

构建一个命令对象(形式为:container-shim –address /run/containerd/containerd.sock)并执行:

runtime/v2/shim/util.go:Command()

接下来进入containerd-shim程序的main()函数:

cmd/containerd-shim/main_unix.go:main()

开始执行shim任务:

cmd/containerd-shim/main_unix.go:executeShim()

创建TTRPC服务,用于与containerd进程通信。TTRPC基于GRPC开发,以翻译自TTRPC的官方描述:

TTRPC是用于低内存环境的GRPC,现有的grpc-go项目需要大量的内存开销来导入包和运行时。这对于大多数服务来说是没问题的,但在低内存环境中这可能是一个问题。这个项目减少了二进制文件的大小和协议开销。我们通过省略”net/http”和”net/http2”来做到这一点和“grpc”包使用的grpc取代它与一个轻量级的框架协议。结果是使用较少驻留内存的较小二进制文件与GRPC一样易于使用。请注意,虽然这个项目支持生成的两端协议,生成的服务定义将与常规的不兼容GRPC服务,因为它们不使用相同的协议。

创建Service对象,该对象中包含容器的增删改查等函数:
cmd/containerd-shim/main_unix.go:executeShim()

    sv, err := shim.NewService(
        shim.Config{
            Path:          path,
            Namespace:     namespaceFlag,
            WorkDir:       workdirFlag,
            Criu:          criuFlag,
            SystemdCgroup: systemdCgroupFlag,
            RuntimeRoot:   runtimeRootFlag,
        },
        &remoteEventsPublisher{address: addressFlag},
    )

注册Service对象到TTRPC路由中:
cmd/containerd-shim/main_unix.go:executeShim()

shimapi.RegisterShimService(server, sv)

下面就到了Service对象的Create()函数,该函数包含创建容器的所有逻辑:
runtime/v1/shim/service.go

func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
    var mounts []process.Mount
    for _, m := range r.Rootfs {
        mounts = append(mounts, process.Mount{
            Type:    m.Type,
            Source:  m.Source,
            Target:  m.Target,
            Options: m.Options,
        })
    }
    rootfs := ""
    if len(mounts) > 0 {
        rootfs = filepath.Join(r.Bundle, "rootfs")
        if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
            return nil, err
        }
    }
    config := &process.CreateConfig{
        ID:               r.ID,
        Bundle:           r.Bundle,
        Runtime:          r.Runtime,
        Rootfs:           mounts,
        Terminal:         r.Terminal,
        Stdin:            r.Stdin,
        Stdout:           r.Stdout,
        Stderr:           r.Stderr,
        Checkpoint:       r.Checkpoint,
        ParentCheckpoint: r.ParentCheckpoint,
        Options:          r.Options,
    }
    defer func() {
        if err != nil {
            if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
                log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
            }
        }
    }()
    for _, rm := range mounts {
        m := &mount.Mount{
            Type:    rm.Type,
            Source:  rm.Source,
            Options: rm.Options,
        }
        if err := m.Mount(rootfs); err != nil {
            return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
        }
    }
    s.mu.Lock()
    defer s.mu.Unlock()
    process, err := newInit(
        ctx,
        s.config.Path,
        s.config.WorkDir,
        s.config.RuntimeRoot,
        s.config.Namespace,
        s.config.Criu,
        s.config.SystemdCgroup,
        s.platform,
        config,
        rootfs,
    )
    if err != nil {
        return nil, errdefs.ToGRPC(err)
    }
    if err := process.Create(ctx, config); err != nil {
        return nil, errdefs.ToGRPC(err)
    }
    // save the main task id and bundle to the shim for additional requests
    s.id = r.ID
    s.bundle = r.Bundle
    pid := process.Pid()
    s.processes[r.ID] = process
    return &shimapi.CreateTaskResponse{
        Pid: uint32(pid),
    }, nil
}

总结


编写日期:2020-03-13