diff --git a/cmd/nfd-worker/main.go b/cmd/nfd-worker/main.go index bf41fa88e6..beb8b41053 100644 --- a/cmd/nfd-worker/main.go +++ b/cmd/nfd-worker/main.go @@ -19,7 +19,9 @@ package main import ( "flag" "fmt" + "net" "os" + "strings" "k8s.io/klog/v2" @@ -32,7 +34,8 @@ import ( const ( // ProgramName is the canonical name of this program - ProgramName = "nfd-worker" + ProgramName = "nfd-worker" + kubeletSecurePort = 10250 ) func main() { @@ -82,6 +85,20 @@ func parseArgs(flags *flag.FlagSet, osArgs ...string) *worker.Args { os.Exit(2) } + if len(args.KubeletConfigURI) == 0 { + nodeAddress := os.Getenv("NODE_ADDRESS") + if len(nodeAddress) == 0 { + _, _ = fmt.Fprintf(flags.Output(), "unable to determine the default kubelet config endpoint 'https://${NODE_ADDRESS}:%d/configz' due to empty NODE_ADDRESS environment, "+ + "please either define the NODE_ADDRESS environment variable or specify endpoint with the -kubelet-config-uri flag\n", kubeletSecurePort) + os.Exit(1) + } + if isIPv6(nodeAddress) { + // With IPv6 we need to wrap the IP address in brackets as we append :port below + nodeAddress = "[" + nodeAddress + "]" + } + args.KubeletConfigURI = fmt.Sprintf("https://%s:%d/configz", nodeAddress, kubeletSecurePort) + } + // Handle overrides flags.Visit(func(f *flag.Flag) { switch f.Name { @@ -106,6 +123,10 @@ func initFlags(flagset *flag.FlagSet) (*worker.Args, *worker.ConfigOverrideArgs) "Config file to use.") flagset.StringVar(&args.Kubeconfig, "kubeconfig", "", "Kubeconfig to use") + flagset.StringVar(&args.KubeletConfigURI, "kubelet-config-uri", "", + "Kubelet config URI path. Default to kubelet configz endpoint.") + flagset.StringVar(&args.APIAuthTokenFile, "api-auth-token-file", "/var/run/secrets/kubernetes.io/serviceaccount/token", + "API auth token file path. It is used to request kubelet configz endpoint, only takes effect when kubelet-config-uri is https. Default to /var/run/secrets/kubernetes.io/serviceaccount/token.") flagset.BoolVar(&args.Oneshot, "oneshot", false, "Do not publish feature labels") flagset.IntVar(&args.Port, "port", 8080, @@ -134,3 +155,8 @@ func initFlags(flagset *flag.FlagSet) (*worker.Args, *worker.ConfigOverrideArgs) return args, overrides } + +func isIPv6(addr string) bool { + ip := net.ParseIP(addr) + return ip != nil && strings.Count(ip.String(), ":") >= 2 +} diff --git a/deployment/base/rbac/kustomization.yaml b/deployment/base/rbac/kustomization.yaml index 6eb2d8a8c7..eebfd478c4 100644 --- a/deployment/base/rbac/kustomization.yaml +++ b/deployment/base/rbac/kustomization.yaml @@ -10,3 +10,5 @@ resources: - worker-serviceaccount.yaml - worker-role.yaml - worker-rolebinding.yaml +- worker-clusterrole.yaml +- worker-clusterrolebinding.yaml \ No newline at end of file diff --git a/deployment/base/rbac/worker-clusterrole.yaml b/deployment/base/rbac/worker-clusterrole.yaml new file mode 100644 index 0000000000..eea36da780 --- /dev/null +++ b/deployment/base/rbac/worker-clusterrole.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: nfd-worker +rules: +- apiGroups: + - "" + resources: ["pods"] + verbs: ["get"] +- apiGroups: [""] + resources: ["nodes/proxy"] + verbs: ["get"] diff --git a/deployment/base/rbac/worker-clusterrolebinding.yaml b/deployment/base/rbac/worker-clusterrolebinding.yaml new file mode 100644 index 0000000000..d80fefa7e2 --- /dev/null +++ b/deployment/base/rbac/worker-clusterrolebinding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: nfd-worker +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: nfd-worker +subjects: +- kind: ServiceAccount + name: nfd-worker + namespace: default diff --git a/deployment/base/rbac/worker-role.yaml b/deployment/base/rbac/worker-role.yaml index 70ee649ab4..3396bbc6ef 100644 --- a/deployment/base/rbac/worker-role.yaml +++ b/deployment/base/rbac/worker-role.yaml @@ -17,4 +17,4 @@ rules: resources: - pods verbs: - - get + - get \ No newline at end of file diff --git a/deployment/components/common/env.yaml b/deployment/components/common/env.yaml index 68507291a8..15a411ae70 100644 --- a/deployment/components/common/env.yaml +++ b/deployment/components/common/env.yaml @@ -13,3 +13,11 @@ valueFrom: fieldRef: fieldPath: metadata.uid + - name: NODE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index 0990ec5253..bd5deb610d 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -19,7 +19,6 @@ package nfdtopologyupdater import ( "fmt" "net/http" - "net/url" "os" "path/filepath" @@ -99,7 +98,7 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (Nf } go ntf.Run() - kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) + kubeletConfigFunc, err := kubeconf.GetKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) if err != nil { return nil, err } @@ -379,38 +378,3 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) { updateAttribute(lhs, attr) } } - -func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) { - u, err := url.ParseRequestURI(uri) - if err != nil { - return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) - } - - // init kubelet API client - var klConfig *kubeletconfigv1beta1.KubeletConfiguration - switch u.Scheme { - case "file": - return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { - klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path) - if err != nil { - return nil, fmt.Errorf("failed to read kubelet config: %w", err) - } - return klConfig, err - }, nil - case "https": - restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile) - if err != nil { - return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) - } - - return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { - klConfig, err = kubeconf.GetKubeletConfiguration(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) - } - return klConfig, nil - }, nil - } - - return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) -} diff --git a/pkg/nfd-worker/nfd-worker.go b/pkg/nfd-worker/nfd-worker.go index da9e50e571..49ad7a9848 100644 --- a/pkg/nfd-worker/nfd-worker.go +++ b/pkg/nfd-worker/nfd-worker.go @@ -38,8 +38,10 @@ import ( "k8s.io/apimachinery/pkg/util/validation" k8sclient "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" "k8s.io/utils/ptr" klogutils "sigs.k8s.io/node-feature-discovery/pkg/utils/klog" + "sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf" "sigs.k8s.io/yaml" apiequality "k8s.io/apimachinery/pkg/api/equality" @@ -56,7 +58,7 @@ import ( _ "sigs.k8s.io/node-feature-discovery/source/fake" _ "sigs.k8s.io/node-feature-discovery/source/kernel" _ "sigs.k8s.io/node-feature-discovery/source/local" - _ "sigs.k8s.io/node-feature-discovery/source/memory" + memory "sigs.k8s.io/node-feature-discovery/source/memory" _ "sigs.k8s.io/node-feature-discovery/source/network" _ "sigs.k8s.io/node-feature-discovery/source/pci" _ "sigs.k8s.io/node-feature-discovery/source/storage" @@ -94,13 +96,16 @@ type Labels map[string]string // Args are the command line arguments of NfdWorker. type Args struct { - ConfigFile string - Klog map[string]*utils.KlogFlagVal - Kubeconfig string - Oneshot bool - Options string - Port int - NoOwnerRefs bool + ConfigFile string + Klog map[string]*utils.KlogFlagVal + Kubeconfig string + Oneshot bool + Options string + Port int + NoOwnerRefs bool + KubeletConfigPath string + KubeletConfigURI string + APIAuthTokenFile string Overrides ConfigOverrideArgs } @@ -124,6 +129,7 @@ type nfdWorker struct { featureSources []source.FeatureSource labelSources []source.LabelSource ownerReference []metav1.OwnerReference + kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error) } // This ticker can represent infinite and normal intervals. @@ -169,12 +175,25 @@ func NewNfdWorker(opts ...NfdWorkerOption) (NfdWorker, error) { stop: make(chan struct{}), } + if nfd.args.ConfigFile != "" { + nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile) + } + for _, o := range opts { o.apply(nfd) } - if nfd.args.ConfigFile != "" { - nfd.configFilePath = filepath.Clean(nfd.args.ConfigFile) + kubeletConfigFunc, err := kubeconf.GetKubeletConfigFunc(nfd.args.KubeletConfigURI, nfd.args.APIAuthTokenFile) + if err != nil { + return nil, err + } + + nfd = &nfdWorker{ + kubeletConfigFunc: kubeletConfigFunc, + } + + for _, o := range opts { + o.apply(nfd) } // k8sClient might've been set via opts by tests @@ -312,6 +331,12 @@ func (w *nfdWorker) Run() error { httpMux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})) registerVersion(version.Get()) + klConfig, err := w.kubeletConfigFunc() + if err != nil { + return err + } + memory.SetSwapMode(klConfig.MemorySwap.SwapBehavior) + err = w.runFeatureDiscovery() if err != nil { return err @@ -624,7 +649,7 @@ func (m *nfdWorker) updateNodeFeatureObject(labels Labels) error { return err } nodename := utils.NodeName() - namespace := m.kubernetesNamespace + namespace := os.Getenv("POD_NAMESPACE") features := source.GetAllFeatures() diff --git a/pkg/utils/kubeconf/kubelet_config_file.go b/pkg/utils/kubeconf/kubelet_config_file.go index 79be004477..713d7467dc 100644 --- a/pkg/utils/kubeconf/kubelet_config_file.go +++ b/pkg/utils/kubeconf/kubelet_config_file.go @@ -19,6 +19,7 @@ package kubeconf import ( "context" "fmt" + "net/url" kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" kubeletconfigscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" @@ -55,3 +56,38 @@ func GetKubeletConfigFromLocalFile(kubeletConfigPath string) (*kubeletconfigv1be return kubeletConfig, nil } + +func GetKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) { + u, err := url.ParseRequestURI(uri) + if err != nil { + return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) + } + + // init kubelet API client + var klConfig *kubeletconfigv1beta1.KubeletConfiguration + switch u.Scheme { + case "file": + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = GetKubeletConfigFromLocalFile(u.Path) + if err != nil { + return nil, fmt.Errorf("failed to read kubelet config: %w", err) + } + return klConfig, err + }, nil + case "https": + restConfig, err := InsecureConfig(u.String(), apiAuthTokenFile) + if err != nil { + return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) + } + + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = GetKubeletConfiguration(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) + } + return klConfig, nil + }, nil + } + + return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) +} diff --git a/source/memory/memory.go b/source/memory/memory.go index 68ef0c741f..7d931e0fa9 100644 --- a/source/memory/memory.go +++ b/source/memory/memory.go @@ -56,11 +56,17 @@ type memorySource struct { // Singleton source instance var ( - src memorySource - _ source.FeatureSource = &src - _ source.LabelSource = &src + src memorySource + _ source.FeatureSource = &src + _ source.LabelSource = &src + defaultSwapBehavior = "NoSwap" + swapBehavior string ) +func SetSwapMode(behavior string) { + swapBehavior = behavior +} + // Name returns an identifier string for this feature source. func (s *memorySource) Name() string { return Name } @@ -80,6 +86,7 @@ func (s *memorySource) GetLabels() (source.FeatureLabels, error) { // Swap if isSwap, ok := features.Attributes[SwapFeature].Elements["enabled"]; ok && isSwap == "true" { labels["swap"] = true + labels["swap.behavior"] = features.Attributes[SwapFeature].Elements["behavior"] } // NVDIMM @@ -106,12 +113,16 @@ func (s *memorySource) Discover() error { } else { s.features.Attributes[NumaFeature] = nfdv1alpha1.AttributeFeatureSet{Elements: numa} } - - // Detect Swap + // Detect Swap and Swap Behavior if swap, err := detectSwap(); err != nil { klog.ErrorS(err, "failed to detect Swap nodes") } else { s.features.Attributes[SwapFeature] = nfdv1alpha1.AttributeFeatureSet{Elements: swap} + swap["behavior"] = defaultSwapBehavior + if swapBehavior != "" { + swap["behavior"] = swapBehavior + } + } // Detect NVDIMM