From 22b206cc698d9edfb32664e68869a0c622c0eae0 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin <makhotkin@selectel.ru> Date: Tue, 6 Dec 2016 12:10:17 +0300 Subject: [PATCH] Adding pod restarting while upgrading release * Added pod restarting for - ReplicationController - DaemonSet - PetSet --- pkg/kube/client.go | 64 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index e721b69b9..3f64778ab 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -36,6 +36,12 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" ) // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found. @@ -199,7 +205,7 @@ func (c *Client) Update(namespace string, currentReader, targetReader io.Reader) return err } - if err := updateResource(info, currentObj); err != nil { + if err := updateResource(c, info, currentObj); err != nil { if alreadyExistErr, ok := err.(ErrAlreadyExists); ok { log.Printf(alreadyExistErr.errorMsg) } else { @@ -295,7 +301,7 @@ func deleteResource(info *resource.Info) error { return resource.NewHelper(info.Client, info.Mapping).Delete(info.Namespace, info.Name) } -func updateResource(target *resource.Info, currentObj runtime.Object) error { +func updateResource(c *Client, target *resource.Info, currentObj runtime.Object) error { encoder := api.Codecs.LegacyCodec(registered.EnabledVersions()...) original, err := runtime.Encode(encoder, currentObj) if err != nil { @@ -319,9 +325,63 @@ func updateResource(target *resource.Info, currentObj runtime.Object) error { // send patch to server helper := resource.NewHelper(target.Client, target.Mapping) _, err = helper.Patch(target.Namespace, target.Name, api.StrategicMergePatchType, patch) + + if err != nil { + return err + } + + kind := target.Mapping.GroupVersionKind.Kind + + client, _ := c.ClientSet() + switch kind { + case "ReplicationController": + rc := currentObj.(*v1.ReplicationController) + err = restartPods(client, target.Namespace, rc.Spec.Selector) + case "DaemonSet": + daemonSet := currentObj.(*v1beta1.DaemonSet) + err = restartPods(client, target.Namespace, daemonSet.Spec.Selector.MatchLabels) + case "StatefulSet": + petSet := currentObj.(*apps.StatefulSet) + err = restartPods(client, target.Namespace, petSet.Spec.Selector.MatchLabels) + case "ReplicaSet": + replicaSet := currentObj.(*v1beta1.ReplicaSet) + err = restartPods(client, target.Namespace, replicaSet.Spec.Selector.MatchLabels) + } + return err } + +func restartPods(client *internalclientset.Clientset, namespace string, selector map[string]string) error { + pods, err := client.Pods(namespace).List(api.ListOptions{ + FieldSelector: fields.Everything(), + LabelSelector: labels.Set(selector).AsSelector(), + }) + + if err != nil { + return err + } + + // Restart pods + for _, pod := range pods.Items { + log.Printf("Restarting pod: %v/%v", pod.Namespace, pod.Name) + + // Delete each pod for get them restarted with changed spec. + err := client.Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{ + Preconditions: &api.Preconditions{ + UID: &pod.UID, + }, + }) + + if err != nil { + return err + } + } + + return nil +} + + func watchUntilReady(info *resource.Info) error { w, err := resource.NewHelper(info.Client, info.Mapping).WatchSingle(info.Namespace, info.Name, info.ResourceVersion) if err != nil { -- GitLab