kubernetes的api测试(python版本)


python访问kubernetes的api

测试

目前提供2种方式。

  • config文件方式访问
  • 通过token方式访问

通过config文件访问

pip install kubernetes

测试需要~/.kube下面的config文件。需要config文件copy到本地 测试

from kubernetes import client, config
config.kube_config.load_kube_config(config_file="kubeconfig.yaml")

获取api版本

v1 = client.CoreV1Api()
for ns in v1.list_namespace().items:
    print(ns.metadata.name)


print("Listing pods with their IPs:")
ret = v1.list_pod_for_all_namespaces(watch=False)
for i in ret.items:
    print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))

通过token访问

from kubernetes import client, config

def main():
    token="eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi10b2tlbi04cDQ5ayIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJhZG1pbiIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjE2MDcwMzFjLTBmY2UtMTFlOS1hNDVlLWZhMTYzZTgxMTZiNSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTphZG1pbiJ9.ajPUwa6oC497L1EiCRNCmWBs6_30BZ0o38vrAZWyzaG-fLudZSsqisAMTf9IKD0Djc2zHpvAJ8704p-PMWqIvv70ubpYIOAvuVniBzWApMyrikDb_tvQLtsxUhKDHJXzng_cTHEPruZ8BfGtKdqmfR67qcWICp-nGU22K9-rvQG8YceV45uXPkvh-Q9SBe6kYFsNU5sbkhIqkX07J6gErF5e_PXMbtLAgnvvwrMPBBCY2bzflB9DNAjO63muJCdVaaUysqAaHaiUBfx9jyqNQXsdyBS51N80RSY7aMR-MrKgUINbRX6i7E137t0whMJUT_ycZ1kEq1F5mgTuHOnHgw"
    APISERVER='https://10.0.7.100:16443'
    configuration=client.Configuration()
    configuration.host=APISERVER
    configuration.verify_ssl = False
    configuration.api_key = {"authorization": "Bearer " + token}
    client.Configuration.set_default(configuration)
    v1 = client.CoreV1Api()
    print("Listing pods with their IPs:")
    for ns in v1.list_namespace().items:
        print(ns.metadata.name)
    #ret = v1.list_pod_for_all_namespaces(watch=False)
    #for i in ret.items:
    #    print("%s\t%s\t%s" %
    #          (i.status.pod_ip, i.metadata.namespace, i.metadata.name))


if __name__=='__main__':
    main()

结果:

[root@localhost.localdomain 16:13 ~]
# python kube-token-test.py 
Listing pods with their IPs:
/usr/lib/python2.7/site-packages/urllib3/connectionpool.py:851: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
  InsecureRequestWarning)
cephfs
default
ingress-nginx
kafka
kube-public
kube-system
liran-test
monitoring
stjr
test-kafka