diff --git a/pkg/container/container_types.go b/pkg/container/container_types.go index 063b422..ea0d5e7 100644 --- a/pkg/container/container_types.go +++ b/pkg/container/container_types.go @@ -29,6 +29,8 @@ type NewContainerInput struct { // Gitea specific AutoRemove bool + + NetworkAliases []string } // FileEntry is a file to copy to a container @@ -41,6 +43,7 @@ type FileEntry struct { // Container for managing docker run containers type Container interface { Create(capAdd []string, capDrop []string) common.Executor + ConnectToNetwork(name string) common.Executor Copy(destPath string, files ...*FileEntry) common.Executor CopyDir(destPath string, srcPath string, useGitIgnore bool) common.Executor GetContainerArchive(ctx context.Context, srcPath string) (io.ReadCloser, error) diff --git a/pkg/container/docker_network.go b/pkg/container/docker_network.go new file mode 100644 index 0000000..76394a9 --- /dev/null +++ b/pkg/container/docker_network.go @@ -0,0 +1,38 @@ +package container + +import ( + "context" + + "github.com/docker/docker/api/types" + "github.com/nektos/act/pkg/common" +) + +func NewDockerNetworkCreateExecutor(name string) common.Executor { + return func(ctx context.Context) error { + cli, err := GetDockerClient(ctx) + if err != nil { + return err + } + + _, err = cli.NetworkCreate(ctx, name, types.NetworkCreate{ + Driver: "bridge", + Scope: "local", + }) + if err != nil { + return err + } + + return nil + } +} + +func NewDockerNetworkRemoveExecutor(name string) common.Executor { + return func(ctx context.Context) error { + cli, err := GetDockerClient(ctx) + if err != nil { + return err + } + + return cli.NetworkRemove(ctx, name) + } +} diff --git a/pkg/container/docker_run.go b/pkg/container/docker_run.go index 5afd8e0..1dac4c7 100644 --- a/pkg/container/docker_run.go +++ b/pkg/container/docker_run.go @@ -16,6 +16,8 @@ import ( "strconv" "strings" + networktypes "github.com/docker/docker/api/types/network" + "github.com/go-git/go-billy/v5/helper/polyfill" "github.com/go-git/go-billy/v5/osfs" "github.com/go-git/go-git/v5/plumbing/format/gitignore" @@ -46,6 +48,25 @@ func NewContainer(input *NewContainerInput) ExecutionsEnvironment { return cr } +func (cr *containerReference) ConnectToNetwork(name string) common.Executor { + return common. + NewDebugExecutor("%sdocker network connect %s %s", logPrefix, name, cr.input.Name). + Then( + common.NewPipelineExecutor( + cr.connect(), + cr.connectToNetwork(name, cr.input.NetworkAliases), + ).IfNot(common.Dryrun), + ) +} + +func (cr *containerReference) connectToNetwork(name string, aliases []string) common.Executor { + return func(ctx context.Context) error { + return cr.cli.NetworkConnect(ctx, name, cr.input.Name, &networktypes.EndpointSettings{ + Aliases: aliases, + }) + } +} + // supportsContainerImagePlatform returns true if the underlying Docker server // API version is 1.41 and beyond func supportsContainerImagePlatform(ctx context.Context, cli client.APIClient) bool { diff --git a/pkg/container/host_environment.go b/pkg/container/host_environment.go index 5d8c7dc..3ca06f3 100644 --- a/pkg/container/host_environment.go +++ b/pkg/container/host_environment.go @@ -40,6 +40,12 @@ func (e *HostEnvironment) Create(capAdd []string, capDrop []string) common.Execu } } +func (e *HostEnvironment) ConnectToNetwork(name string) common.Executor { + return func(ctx context.Context) error { + return nil + } +} + func (e *HostEnvironment) Close() common.Executor { return func(ctx context.Context) error { return nil diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index e20bf59..7dc3389 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -114,7 +114,21 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo // always allow 1 min for stopping and removing the runner, even if we were cancelled ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute) defer cancel() + + logger := common.Logger(ctx) + logger.Infof("Cleaning up services for job %s", rc.JobName) + if err := rc.stopServiceContainers()(ctx); err != nil { + logger.Errorf("Error while cleaning services: %v", err) + } + + logger.Infof("Cleaning up container for job %s", rc.JobName) err = info.stopContainer()(ctx) + + logger.Infof("Cleaning up network for job %s", rc.JobName) + networkName := fmt.Sprintf("%s-network", rc.jobContainerName()) + if err := rc.removeNetwork(networkName)(ctx); err != nil { + logger.Errorf("Error while cleaning network: %v", err) + } } setJobResult(ctx, info, rc, jobError == nil) setJobOutputs(ctx, rc) diff --git a/pkg/runner/run_context.go b/pkg/runner/run_context.go index 31d19b5..398d3ae 100644 --- a/pkg/runner/run_context.go +++ b/pkg/runner/run_context.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/docker/docker/errdefs" "github.com/mitchellh/go-homedir" "github.com/opencontainers/selinux/go-selinux" log "github.com/sirupsen/logrus" @@ -43,6 +44,7 @@ type RunContext struct { IntraActionState map[string]map[string]string ExprEval ExpressionEvaluator JobContainer container.ExecutionsEnvironment + ServiceContainers []container.ExecutionsEnvironment OutputMappings map[MappableOutput]MappableOutput JobName string ActionPath string @@ -242,6 +244,38 @@ func (rc *RunContext) startJobContainer() common.Executor { ext := container.LinuxContainerEnvironmentExtensions{} binds, mounts := rc.GetBindsAndMounts() + // add service containers + for name, spec := range rc.Run.Job().Services { + mergedEnv := envList + for k, v := range spec.Env { + mergedEnv = append(mergedEnv, fmt.Sprintf("%s=%s", k, v)) + } + serviceContainerName := createSimpleContainerName(rc.jobContainerName(), name) + c := container.NewContainer(&container.NewContainerInput{ + Name: serviceContainerName, + WorkingDir: ext.ToContainerPath(rc.Config.Workdir), + Image: spec.Image, + Username: username, + Password: password, + Env: mergedEnv, + Mounts: map[string]string{ + // TODO merge volumes + name: ext.ToContainerPath(rc.Config.Workdir), + "act-toolcache": "/toolcache", + "act-actions": "/actions", + }, + Binds: binds, + Stdout: logWriter, + Stderr: logWriter, + Privileged: rc.Config.Privileged, + UsernsMode: rc.Config.UsernsMode, + Platform: rc.Config.ContainerArchitecture, + AutoRemove: rc.Config.AutoRemove, + NetworkAliases: []string{name}, + }) + rc.ServiceContainers = append(rc.ServiceContainers, c) + } + rc.cleanUpJobContainer = func(ctx context.Context) error { if rc.JobContainer != nil && !rc.Config.ReuseContainers { return rc.JobContainer.Remove(). @@ -275,11 +309,24 @@ func (rc *RunContext) startJobContainer() common.Executor { return errors.New("Failed to create job container") } + networkName := fmt.Sprintf("%s-network", rc.jobContainerName()) return common.NewPipelineExecutor( + rc.pullServicesImages(rc.Config.ForcePull), rc.JobContainer.Pull(rc.Config.ForcePull), + rc.stopServiceContainers(), rc.stopJobContainer(), + func(ctx context.Context) error { + err := rc.removeNetwork(networkName)(ctx) + if errdefs.IsNotFound(err) { + return nil + } + return err + }, + rc.createNetwork(networkName), + rc.startServiceContainers(networkName), rc.JobContainer.Create(rc.Config.ContainerCapAdd, rc.Config.ContainerCapDrop), rc.JobContainer.Start(false), + rc.JobContainer.ConnectToNetwork(networkName), rc.JobContainer.Copy(rc.JobContainer.GetActPath()+"/", &container.FileEntry{ Name: "workflow/event.json", Mode: 0o644, @@ -293,6 +340,18 @@ func (rc *RunContext) startJobContainer() common.Executor { } } +func (rc *RunContext) createNetwork(name string) common.Executor { + return func(ctx context.Context) error { + return container.NewDockerNetworkCreateExecutor(name)(ctx) + } +} + +func (rc *RunContext) removeNetwork(name string) common.Executor { + return func(ctx context.Context) error { + return container.NewDockerNetworkRemoveExecutor(name)(ctx) + } +} + func (rc *RunContext) execJobContainer(cmd []string, env map[string]string, user, workdir string) common.Executor { return func(ctx context.Context) error { return rc.JobContainer.Exec(cmd, env, user, workdir)(ctx) @@ -354,6 +413,41 @@ func (rc *RunContext) stopJobContainer() common.Executor { } } +func (rc *RunContext) pullServicesImages(forcePull bool) common.Executor { + return func(ctx context.Context) error { + execs := []common.Executor{} + for _, c := range rc.ServiceContainers { + execs = append(execs, c.Pull(forcePull)) + } + return common.NewParallelExecutor(len(execs), execs...)(ctx) + } +} + +func (rc *RunContext) startServiceContainers(networkName string) common.Executor { + return func(ctx context.Context) error { + execs := []common.Executor{} + for _, c := range rc.ServiceContainers { + execs = append(execs, common.NewPipelineExecutor( + c.Pull(false), + c.Create(rc.Config.ContainerCapAdd, rc.Config.ContainerCapDrop), + c.Start(false), + c.ConnectToNetwork(networkName), + )) + } + return common.NewParallelExecutor(len(execs), execs...)(ctx) + } +} + +func (rc *RunContext) stopServiceContainers() common.Executor { + return func(ctx context.Context) error { + execs := []common.Executor{} + for _, c := range rc.ServiceContainers { + execs = append(execs, c.Remove()) + } + return common.NewParallelExecutor(len(execs), execs...)(ctx) + } +} + // Prepare the mounts and binds for the worker // ActionCacheDir is for rc diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 60a8193..a691387 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -546,6 +546,43 @@ func TestRunEventSecrets(t *testing.T) { tjfi.runTest(context.Background(), t, &Config{Secrets: secrets, Env: env}) } +func TestRunWithService(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + log.SetLevel(log.DebugLevel) + ctx := context.Background() + + platforms := map[string]string{ + "ubuntu-latest": "node:12.20.1-buster-slim", + } + + workflowPath := "services" + eventName := "push" + + workdir, err := filepath.Abs("testdata") + assert.NoError(t, err, workflowPath) + + runnerConfig := &Config{ + Workdir: workdir, + EventName: eventName, + Platforms: platforms, + ReuseContainers: false, + } + runner, err := New(runnerConfig) + assert.NoError(t, err, workflowPath) + + planner, err := model.NewWorkflowPlanner(fmt.Sprintf("testdata/%s", workflowPath), true) + assert.NoError(t, err, workflowPath) + + plan, err := planner.PlanEvent(eventName) + assert.NoError(t, err, workflowPath) + + err = runner.NewPlanExecutor(plan)(ctx) + assert.NoError(t, err, workflowPath) +} + func TestRunActionInputs(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") diff --git a/pkg/runner/testdata/services/push.yaml b/pkg/runner/testdata/services/push.yaml new file mode 100644 index 0000000..f6ca7bc --- /dev/null +++ b/pkg/runner/testdata/services/push.yaml @@ -0,0 +1,26 @@ +name: services +on: push +jobs: + services: + name: Reproduction of failing Services interpolation + runs-on: ubuntu-latest + services: + postgres: + image: postgres:12 + env: + POSTGRES_USER: runner + POSTGRES_PASSWORD: mysecretdbpass + POSTGRES_DB: mydb + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: + - name: Echo the Postgres service ID / Network / Ports + run: | + echo "id: ${{ job.services.postgres.id }}" + echo "network: ${{ job.services.postgres.network }}" + echo "ports: ${{ job.services.postgres.ports }}"