DOCKER源码-创建容器
Daemon结构体
daemon/daemon.go
type Daemon struct {
// ... 省略多行 ...
containerdCli *containerd.Client
containerd libcontainerdtypes.Client
volumes *volumesservice.VolumesService
// ... 省略多行 ...
}
client结构体
Daemon中的containerd字段负责容器相关操作,Daemon对象所有对容器的操作都是通过调用containerd对象相应函数来完成的,它是一个接口,相应的实现定义在libcontainerd/remote/client.go
文件中,它具有以下函数(只列出部分):
func (c *client) Version(ctx context.Context) (containerd.Version, error)
func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error)
func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}, opts ...containerd.NewContainerOpts) error
func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error)
func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error)
Client结构体
client结构体是一个较高层的封装,它在实例化的时候依赖Client结构体,而Client结构体实际是一个RPC客户端,所有容器操作都是通过RPC调用containerd进程,关键代码:
vendor/github.com/containerd/containerd/api/services/containers/v1/containers.pb.go
func (c *containersClient) Create(ctx context.Context, in *CreateContainerRequest,
opts ...grpc.CallOption) (*CreateContainerResponse, error) {
out := new(CreateContainerResponse)
err := c.cc.Invoke(ctx, "/containerd.services.containers.v1.Containers/Create", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
containerd程序的启动
下面进入containerd项目,看一下containerd是怎样处理RPC请求的,先简单说一下containerd的启动流程:
main()函数定义在cmd/containerd/main.go
:
func main() {
app := command.App()
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "containerd: %s\n", err)
os.Exit(1)
}
}
- 它先创建了一个app对象,该对象中包含一个启动函数
app.Action()
,该函数中启动了RPC Server。
- 紧接着执行APP对象的Run()函数,此时
app.Action()
函数被执行,RPC Server启动。
RPC Server接口定义
api/services/containers/v1/containers.pb.go
type ContainersServer interface {
Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
ListStream(*ListContainersRequest, Containers_ListStreamServer) error
Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
Delete(context.Context, *DeleteContainerRequest) (*types.Empty, error)
}
RPC路由定义
ContainersServer接口没有直接实现,而是通过路由方式将请求映射到相应函数,以下路由定义:
var _Containers_serviceDesc = grpc.ServiceDesc{
// ... 省略多行 ...
{
MethodName: "Create",
Handler: _Containers_Create_Handler,
},
// ... 省略多行 ...
}
RPC Server接口实现
可以看到,负责创建容器的函数是_Containers_Create_Handler()
:
api/services/containers/v1/containers.pb.go
func _Containers_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ContainersServer).Create(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.services.containers.v1.Containers/Create",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ContainersServer).Create(ctx, req.(*CreateContainerRequest))
}
return interceptor(ctx, in, info, handler)
}
路由注册
由上面函数可以知道,创建容器的动作其实是调用srv对象的Create()函数,那srv对象是谁传进来的呢?我们看一下上面路由时怎样注册的,上面的_Containers_serviceDesc
变量在注册RPC Server的时候被使用:
func RegisterContainersServer(s *grpc.Server, srv ContainersServer) {
s.RegisterService(&_Containers_serviceDesc, srv)
}
RegisterContainersServer()
函数又是由server调用的:
services/containers/service.go
func (s *service) Register(server *grpc.Server) error {
api.RegisterContainersServer(server, s)
return nil
}
RPC接口实现
由此可见,RPC Server中的Create()函数其实是调用service对象的Create()函数:
services/containers/service.go
func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
return s.local.Create(ctx, req)
}
local结构体
而service对象的Create()函数是调用的local字段的Create()函数,local是一个插件,它是在local包被加载的时候注册的,local的Create()函数如下:
services/containers/local.go
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
container := containerFromProto(&req.Container)
created, err := l.Store.Create(ctx, container)
if err != nil {
return err
}
resp.Container = containerToProto(&created)
return nil
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: &eventstypes.ContainerCreate_Runtime{
Name: resp.Container.Runtime.Name,
Options: resp.Container.Runtime.Options,
},
}); err != nil {
return &resp, err
}
return &resp, nil
}
运行时插件
local调用了db字段的Create()函数,db是一个运行时插件,在1.2.13版本的containerd中包含两个版本的运行时:
runtime/v1/linux/runtime.go
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID: "linux",
InitFn: New,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Config: &Config{
Shim: defaultShim,
Runtime: defaultRuntime,
},
})
}
runtime/v2/manager.go
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2,
ID: "task",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Config: &Config{
Platforms: defaultPlatforms(),
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
supportedPlatforms, err := parsePlatforms(ic.Config.(*Config).Platforms)
if err != nil {
return nil, err
}
ic.Meta.Platforms = supportedPlatforms
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
if err := os.MkdirAll(ic.State, 0711); err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cs := metadata.NewContainerStore(m.(*metadata.DB))
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
},
})
}
调用运行时插件
运行时插件包含两部分,shim和runc,shim来控制创建容器的整个流程,而设置容器的cgroup、namespace、启动容器中的进程等工作则调用runc来完成。
以v2版本为例,创建容器的函数如下:
runtime/v2/runc/v2/service.go
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
创建containerd-shim进程
此时的调用链如下:
调用运行时的Create()函数:
runtime/v2/manager.go:Create()
构建containerd-shim命令对象,准备执行:
runtime/v2/binary.go:Start()
构建一个命令对象(形式为:container-shim –address /run/containerd/containerd.sock)并执行:
runtime/v2/shim/util.go:Command()
接下来进入containerd-shim程序的main()函数:
cmd/containerd-shim/main_unix.go:main()
开始执行shim任务:
cmd/containerd-shim/main_unix.go:executeShim()
创建TTRPC服务,用于与containerd进程通信。TTRPC基于GRPC开发,以翻译自TTRPC的官方描述:
TTRPC是用于低内存环境的GRPC,现有的grpc-go项目需要大量的内存开销来导入包和运行时。这对于大多数服务来说是没问题的,但在低内存环境中这可能是一个问题。这个项目减少了二进制文件的大小和协议开销。我们通过省略”net/http”和”net/http2”来做到这一点和“grpc”包使用的grpc取代它与一个轻量级的框架协议。结果是使用较少驻留内存的较小二进制文件与GRPC一样易于使用。请注意,虽然这个项目支持生成的两端协议,生成的服务定义将与常规的不兼容GRPC服务,因为它们不使用相同的协议。
创建Service对象,该对象中包含容器的增删改查等函数:
cmd/containerd-shim/main_unix.go:executeShim()
sv, err := shim.NewService(
shim.Config{
Path: path,
Namespace: namespaceFlag,
WorkDir: workdirFlag,
Criu: criuFlag,
SystemdCgroup: systemdCgroupFlag,
RuntimeRoot: runtimeRootFlag,
},
&remoteEventsPublisher{address: addressFlag},
)
注册Service对象到TTRPC路由中:
cmd/containerd-shim/main_unix.go:executeShim()
shimapi.RegisterShimService(server, sv)
下面就到了Service对象的Create()函数,该函数包含创建容器的所有逻辑:
runtime/v1/shim/service.go
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
var mounts []process.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rootfs := ""
if len(mounts) > 0 {
rootfs = filepath.Join(r.Bundle, "rootfs")
if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
return nil, err
}
}
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: r.Runtime,
Rootfs: mounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
defer func() {
if err != nil {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
}
}
}()
for _, rm := range mounts {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
s.mu.Lock()
defer s.mu.Unlock()
process, err := newInit(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.Criu,
s.config.SystemdCgroup,
s.platform,
config,
rootfs,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := process.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
总结
- dockerd进程和containerd进程之间用GRPC进行通信,dockerd需要创建容器时发送RPC请求给containerd。
- containerd执行containerd-shim命令并创建一个新进程。
- 新的containerd-shim进程调用runc在物理机上创建容器,然后启动一个TTRPC服务用于接受containerd发来的其他调用。
- 当用户需要在容器内执行其他任务时,containerd发送TTRPC请求给containerd-shim,containerd-shim执行Task模块在容器中创建新的进程。