柴少鹏的官方网站 技术在分享中进步,水平在学习中升华

ansible在Python中的使用(七)

#前面说的都是直接运行命令行执行ansible和ansible-playbook,但是如果搞自动化的话还是需要嵌套到程序中调用,这里就记录下Python如何使用Ansible做自动化。

#首先了解Ansible提供两种完成任务方式:Ad-Hoc模式,即命令集,适合解决一些简单或者平时工作中临时遇到的任务。PlayBook模式, 即Ansible-playbook剧本,适合解决复杂或需固化下来的任务。

一、Python API官网翻译-可直接略过

1.1 Python API

       该API供内部Ansible使用。 Ansible可以随时对此API进行更改,这可能会破坏与该API的较早版本的向后兼容性。 因此,Ansible不支持外部使用。从API角度来看,有多种使用Ansible的方法。 你可以使用Ansible Python API来控制节点,可以扩展Ansible以响应各种Python事件,可以编写插件,还可以从外部数据源插入清单数据。

      由于Ansible依赖于forking processes,因此该API并非线程安全的。

Python API(ansible2.7)范例:

      这个例子是一个简单的演示,展示了如何最少地运行几个tasks:

#!/usr/bin/env python 
import json
import shutil
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
import ansible.constants as C
class ResultCallback(CallbackBase):
    """一个示例回调插件,用于在结果进入时执行操作,如果要将所有结果收集到单个对象中以在以下位置进行处理执行结束后,研究利用json回调插件或编写自己的自定义回调插件
    """
    def v2_runner_on_ok(self, result, **kwargs):
        """打印结果的json表示此方法可以将结果存储在实例属性中,以供以后检索
        """
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))
#由于API是为CLI构建的,因此它希望始终设置某些选项,将其命名为元组“fakes(伪造)”了args解析选项对象
Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])
options = Options(connection='local', module_path=['/to/mymodules'], forks=10, become=None, become_method=None, become_user=None, check=False, diff=False)

# 初始化所需的对象
loader = DataLoader() # 负责查找和读取yaml,json和ini文件
passwords = dict(vault_pass='secret')

# 实例化ResultCallback以处理传入的结果。Ansible希望它成为其主要的显示渠道之一。
results_callback = ResultCallback()

# 创建清单,使用路径将配置文件作为源或以逗号分隔的字符串作为主机
inventory = InventoryManager(loader=loader, sources='localhost,')
# variable manager负责合并所有不同的源,以便为你提供每种情况下可用变量的统一视图
variable_manager = VariableManager(loader=loader, inventory=inventory)

# 创建代表我们的工作(包括tasks)的数据结构,这基本上是我们的YAML加载程序在内部执行的操作。
play_source =  dict(
        name = "Ansible Play",
        hosts = 'localhost',
        gather_facts = 'no',
        tasks = [
            dict(action=dict(module='shell', args='ls'), register='shell_out'),
            dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}')))
         ]
    )
# 创建play object,,playbook objects use。load代替init或新方法,这也会根据play_source中提供的信息自动创建任务对象
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

#运行它-实例化任务队列管理器,它负责派生并设置所有对象以遍历主机列表和任务tqm = None
try:
    tqm = TaskQueueManager(
              inventory=inventory,
              variable_manager=variable_manager,
              loader=loader,
              options=options,
              passwords=passwords,
              stdout_callback=results_callback,  #使用我们的自定义回调而不是``default``的回调插件,该插件会打印到stdout
          )
    result = tqm.run(play) # play中最有趣的数据实际上已发送到回调的方法
finally:
    # 我们总是需要清理子进程以及我们用来与之通信的结构
    if tqm is not None:
        tqm.cleanup()

    # 删除Ansible的tmpdir
    shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

Ansible通过显示对象发出警告和错误,该对象直接打印到stdout,stderr和Ansible日志。Ansible命令行工具的源代码(lib/ansible/cli/)在Github上可用。

Python API(ansible2.10)范例:


#!/usr/bin/env python

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import json
import shutil

import ansible.constants as C
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from ansible import context

# 创建一个回调插件,以便我们可以捕获输出
class ResultsCollectorJSONCallback(CallbackBase):
    """一个示例回调插件,用于在结果出现时执行操作。如果要将所有结果收集到单个对象中,以在执行结束时进行处理,请考虑使用json回调插件或编写自己的自定义 回调插件。
    """

    def __init__(self, *args, **kwargs):
        super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        host = result._host
        self.host_unreachable[host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        """打印结果的json表示形式。另外,将结果存储在实例属性中,以供以后检索"""
        host = result._host
        self.host_ok[host.get_name()] = result
        print(json.dumps({host.name: result._result}, indent=4))

    def v2_runner_on_failed(self, result, *args, **kwargs):
        host = result._host
        self.host_failed[host.get_name()] = result
def main():
    host_list = ['localhost', 'www.example.com', 'www.google.com']
    # 由于API是为CLI构造的,因此希望始终在上下文对象中设置某些选项
    context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
                                    become_method=None, become_user=None, check=False, diff=False)
    #需要https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204
    sources = ','.join(host_list)
    if len(host_list) == 1:
        sources += ','

    # 初始化所需的对象
    loader = DataLoader()  # 负责查找和读取yaml,json和ini文件
    passwords = dict(vault_pass='secret')

    #实例化我们的ResultsCollectorJSONCallback以处理传入的结果。Ansible希望它是其主要的显示渠道之一。
    results_callback = ResultsCollectorJSONCallback()

    # 创建清单,使用路径将配置文件作为源或以逗号分隔的字符串作为主机
    inventory = InventoryManager(loader=loader, sources=sources)

    # variable manager负责合并所有不同的源,以便为你提供每种情况下可用变量的统一视图
    variable_manager = VariableManager(loader=loader, inventory=inventory)

    # 实例化任务队列管理器,负责forking和设置所有对象以遍历主机列表和任务
    # 重要说明:这还将库目录路径添加到模块加载器
    #重要事项:因此必须在调用Play.load()之前将其初始化。
    tqm = TaskQueueManager(
        inventory=inventory,
        variable_manager=variable_manager,
        loader=loader,
        passwords=passwords,
        stdout_callback=results_callback,  # 使用我们的自定义回调而不是默认的回调插件,该插件会打印到stdout
    )

    # 创建表示我们的工作(包括tasks)的数据结构,这基本上是我们的YAML加载程序在内部执行的操作。
    play_source = dict(
        name="Ansible Play",
        hosts=host_list,
        gather_facts='no',
        tasks=[
            dict(action=dict(module='shell', args='ls'), register='shell_out'),
            dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
            dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
        ]
    )

    #创建play object,,playbook objects use.load代替init或新方法,这也会根据play_source中提供的信息自动创建任务对象
    play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

    # 运行它
    try:
        result = tqm.run(play)  # play中最有趣的数据实际上已发送到回调的方法
    finally:
        # 我们总是需要清理子进程以及我们用来与之通信的结构
        tqm.cleanup()
        if loader:
            loader.cleanup_all_tmp_files()

    # 删除Ansible的tmpdir
    shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

    print("UP ***********")
    for host, result in results_callback.host_ok.items():
        print('{0} >>> {1}'.format(host, result._result['stdout']))

    print("FAILED *******")
    for host, result in results_callback.host_failed.items():
        print('{0} >>> {1}'.format(host, result._result['msg']))

    print("DOWN *********")
    for host, result in results_callback.host_unreachable.items():
        print('{0} >>> {1}'.format(host, result._result['msg']))if __name__ == '__main__':
    main()

1.2  Appendix: Module Utilities(模块实用程序)

        Ansible提供了许多模块实用程序,这些实用程序提供了在开发自己的模块时可以使用的辅助功能。 basic.py模块实用程序提供了用于访问Ansible库的主要入口点,并且所有Python Ansible模块至少必须导入AnsibleModule:

from ansible.module_utils.basic import AnsibleModule

       以下是module_utils文件的列表和一般说明。 模块实用程序源代码位于主Ansible路径下的./lib/ansible/module_utils目录中-有关任何特定模块实用程序的更多详细信息,请参见源代码。

api.py-添加了对通用API模块的共享支持。

azure_rm_common.py-Microsoft Azure资源管理器模板部署的定义和实用程序。

basic.py-Ansible模块的常规定义和帮助程序。

cloudstack.py-CloudStack模块的实用程序。

database.py-PostGRES和MySQL的辅助功能

docker_common.py-用于Docker的模块的定义和帮助程序实用程序。

ec2.py-用于Amazon EC2的模块的定义和实用程序

facts/-包含用于返回事实的模块的辅助功能的文件夹。有关更多信息,请参见https://github.com/ansible/ansible/pull/23012。

gce.py-与Google Compute Engine资源一起使用的模块的定义和帮助程序功能。

ismount.py-包含单个帮助程序功能,用于修复os.path.ismount

keycloak.py-使用Keycloak API的模块的定义和帮助程序功能

known_hosts.py-用于使用known_hosts文件的实用程序

manageiq.py-与ManageIQ平台及其资源配合使用的模块的功能和实用程序。

memset.py-与Memset的API进行交互的帮助程序功能和实用程序。

mysql.py-允许模块连接到MySQL实例

netapp.py-与NetApp存储平台一起使用的模块的功能和实用程序。

openstack.py-适用于Openstack实例的模块的实用程序。

openswitch.py-管理OpenSwitch设备的模块的定义和帮助功能

powershell.ps1-用于Microsoft Windows客户端的实用程序

pure.py-与Pure Storage存储平台一起使用的模块的功能和实用程序。

pycompat24.py-Python 2.4的例外解决方法。

rax.py-使用Rackspace资源的模块的定义和帮助程序功能。

redhat.py-用于管理Red Hat Network注册和订阅的模块的功能

service.py-包含使模块能够与Linux服务一起使用的实用程序(占位符,未使用)。

shell.py-允许模块创建shell并使用shell命令的函数

six/__init__.py-六个Python库的捆绑副本,以帮助编写与Python2和Python3兼容的代码。

splitter.py-用于Jinja2模板的字符串拆分和处理实用程序

urls.py-用于处理HTTP和https请求的实用程序

vca.py-包含与VMware vCloud Air一起使用的模块的实用程序

vmware.py-包含适用于VMware vSphere VM的模块的实用程序

network的参考官网文档 :https://docs.ansible.com/ansible/latest/dev_guide/developing_module_utilities.html

开发人员指南:https://docs.ansible.com/ansible/latest/dev_guide/index.html

1.3 Developing dynamic inventory

       Ansible可以使用提供的清单插件从动态资源(包括云资源)中提取清单信息。 如果现有插件当前未涵盖所需的源,则可以像其他任何插件类型一样创建自己的清单插件。

      在以前的版本中,你必须创建一个脚本或程序,当使用适当的参数调用时,该脚本或程序可以以正确的格式输出JSON。 你仍然可以使用和编写清单脚本,因为ansible通过脚本清单插件确保了向后兼容性,并且对所使用的编程语言没有限制。 但是,如果选择编写脚本,则需要自己实现一些功能,例如caching, configuration management, dynamic variable and group composition(缓存,配置管理,动态变量和组组成)等。 如果改用清单插件,则可以利用Ansible代码库并自动添加这些常用功能。

     官网链接:https://docs.ansible.com/ansible/latest/dev_guide/developing_inventory.html#developing-inventory

Inventory sources

      Inventory来源是Inventory插件使用的输入字符串。 清单资源可以是文件或脚本的路径,也可以是插件可以解释的原始数据。下表显示了清单插件的一些示例以及你可以在命令行上使用-i传递给它们的源类型。

host list:以逗号分隔的主机列表
yaml:YAML格式数据文件的路径
constructed:YAML配置文件的路径
ini:INI格式的数据文件的路径
virtualbox:YAML配置文件的路径
script plugin:输出JSON的可执行文件的路径

Inventory plugins

       与大多数插件类型(模块除外)一样,清单插件必须使用Python开发。 它们在控制器上执行,因此应遵守控制节点的要求。

      通常,清单插件是在运行开始时以及加载playbooks, plays, or roles之前执行的。 但是,你可以使用meta:refresh_inventory任务清除当前清单并再次执行清单插件,此任务将生成一个新清单。

      如果你使用持久性缓存,则清单插件还可以使用配置的缓存插件来存储和检索数据。 缓存清单避免了重复和昂贵的外部调用。

Developing an inventory plugin(开发inventory plugin)

要做的第一件事是使用基类:

from ansible.plugins.inventory import BaseInventoryPlugin

class InventoryModule(BaseInventoryPlugin):
    NAME = 'myplugin'  # 由Ansible内部使用,应与文件名匹配,但不是必需的

      如果inventory plugin在集合中,则NAME的格式应为“namespace.collection_name.myplugin”。 基类具有每个插件应实现的几个方法,以及一些用于解析清单源和更新清单的助手。基本插件正常工作后,你可以通过添加更多基类来合并其他功能:

from ansible.plugins.inventory import BaseInventoryPlugin, Constructable, Cacheable

class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
    NAME = 'myplugin'

      对于插件中的大部分工作,我们通常要处理2个方法verify_file和parse。

verify_file method

       Ansible使用此方法快速确定插件是否可以使用inventory资源。 确定不一定是100%准确的,因为插件可以处理的内容可能有所重叠,并且默认情况下,Ansible将按照其启用顺序尝试启用的插件。

def verify_file(self, path):
    ''' 如果可能是此插件可以使用的有效文件,则返回true/false '''
    valid = False
    if super(InventoryModule, self).verify_file(path):
        # 基类验证文件是否存在并且当前用户可以读取
        if path.endswith(('virtualbox.yaml', 'virtualbox.yml', 'vbox.yaml', 'vbox.yml')):
            valid = True
    return valid

       在上面的示例中,从virtualbox inventory plugin中,我们筛选了特定的文件名模式,以避免尝试消耗任何有效的YAML文件。 可以在此处添加任何类型的条件,但最常见的是“extension matching(扩展名匹配)”。 如果为YAML配置文件实现扩展名匹配,则应接受路径后缀 <plugin_name>.<yml|yaml>。 所有有效的扩展名都应记录在插件说明中。

      以下是另一个示例,该示例不使用“file”,而是使用主机列表插件中的host list plugin字符串本身:

def verify_file(self, path):
    ''' 不要调用基类,因为我们don't expect a path,而是主机列表 '''
    host_list = path
    valid = False
    b_path = to_bytes(host_list, errors='surrogate_or_strict')
    if not os.path.exists(b_path) and ',' in host_list:
        # 该路径不存在,并且用逗号表示这是一个“host list”
        valid = True
    return valid

      此方法只是为了加快清单编制过程,避免不必要的源解析,这些源很容易在引起解析错误之前就被过滤掉。

parse method

此方法完成了插件中的大部分工作。 它采用以下参数:

inventory: 具有现有数据的清单对象以及将hosts/groups/variables添加到清单的方法
loader: Ansible的DataLoader。 DataLoader可以读取文件,自动加载JSON/YAML和解密存储的数据以及缓存读取的文件。
path: 带有inventory source的字符串(通常是路径,但不是必需的)
cache: 指示插件是否应使用或避免缓存(缓存插件和/或加载程序)

基类会做一些最小的分配,以便在其他方法中重用。

def parse(self, inventory, loader, path, cache=True):

 self.loader = loader
 self.inventory = inventory
 self.templar = Templar(loader=loader)

现在由插件来解析提供的inventory source 并将其转换为Ansible清单。 为方便起见,下面的示例使用了一些辅助函数:

NAME = 'myplugin'

def parse(self, inventory, loader, path, cache=True):

     # 调用基本方法以确保属性可与其他辅助方法一起使用
     super(InventoryModule, self).parse(inventory, loader, path, cache)

     # 此方法将解析“common format式”的inventory source,并根据需要更新DOCUMENTATION中声明的所有选项    
     config = self._read_config_data(path)

     # 如果不使用_read_config_data,则应直接调用set_options来处理此插件的任何已定义配置,如果未定义任何选项,则可以跳过
     #self.set_options()

     # 来自inventory source的示例消费选项
     mysession = apilib.session(user=self.get_option('api_user'),
                                password=self.get_option('api_pass'),
                                server=self.get_option('api_server')
     )


     # 提出获取数据以将其输入清单的请求
     mydata = mysession.getitall()

     #解析数据并创建inventory对象:
     for colo in mydata:
         for server in mydata[colo]['servers']:
             self.inventory.add_host(server['name'])
             self.inventory.set_variable(server['name'], 'ansible_host', server['external_ip'])

       具体细节将根据API和返回的结构而有所不同。 请记住,如果收到库存源错误或任何其他问题,则应引发AnsibleParserError以使Ansible知道源无效或过程失败。有关如何实现清单插件的示例,请参见此处的源代码:lib/ansible/plugins/inventory。

inventory cache

To cache the inventory,请使用stock_cache文档片段扩展清单插件文档,并使用Cacheable基类。

extends_documentation_fragment:
  - inventory_cache
class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):

    NAME = 'myplugin'

       接下来,加载用户指定的缓存插件,以读取和更新缓存。 如果你的清单插件使用基于YAML的配置文件和_read_config_data方法,则将在该方法中加载缓存插件。 如果你的清单插件不使用_read_config_data,则必须使用load_cache_plugin显式加载缓存。

NAME = 'myplugin'
def parse(self, inventory, loader, path, cache=True):
    super(InventoryModule, self).parse(inventory, loader, path)

    self.load_cache_plugin()

      使用缓存插件之前,必须使用get_cache_key方法检索唯一的缓存键。 所有库存模块都需要使用缓存来完成此任务,以免use/overwrite缓存的其他部分。

def parse(self, inventory, loader, path, cache=True):
    super(InventoryModule, self).parse(inventory, loader, path)

    self.load_cache_plugin()
    cache_key = self.get_cache_key(path)

       现在,你已启用缓存,加载了正确的插件并检索了唯一的缓存键,你可以使用parse方法的cache参数来设置缓存和inventory资源之间的数据流。 该值来自inventory管理器,指示是否正在刷新库存(例如通过--flush-cache或元任务refresh_inventory)。 尽管刷新时不应使用缓存来填充清单,但是如果用户启用了缓存,则应使用新清单更新缓存。 您可以像字典一样使用self._cache。 以下模式允许刷新清单以与缓存一起使用。

def parse(self, inventory, loader, path, cache=True):
    super(InventoryModule, self).parse(inventory, loader, path)

    self.load_cache_plugin()
    cache_key = self.get_cache_key(path)

    # 此时,cache可能为True或False,以指示是否正在刷新inventory资源。同样也要获取用户的cache选项,以查看是否应在更改缓存时保存它
    user_cache_setting = self.get_option('cache')

    # 如果用户已启用缓存并且未刷新缓存,则读取
    attempt_to_read_cache = user_cache_setting and cache
    #如果用户已启用缓存并且正在刷新缓存,则更新; 如果缓存已在下面过期,请将此值更新为True
    cache_needs_update = user_cache_setting and not cache

    # 如果未刷新inventory并且用户启用了缓存,则尝试读取缓存
    if attempt_to_read_cache:
        try:
            results = self._cache[cache_key]
        except KeyError:
            #如果cache_key不在缓存中或cache_key过期,则会发生这种情况,因此需要更新缓存
            cache_needs_update = True

    if cache_needs_update:
        results = self.get_inventory()

        # set the cache
        self._cache[cache_key] = results

    self.populate(results)

      解析方法完成后,如果缓存的内容已更改,那么self._cache的内容将用于设置缓存插件。

还有其他三种可用的缓存方法:

set_cache_plugin #在parse方法完成之前,强制使用self._cache的内容设置缓存插件
update_cache_if_changed #仅在parse方法完成之前修改self._cache时设置缓存插件
clear_cache  #最终通过调用缓存插件的flush()方法刷新缓存,缓存的实现取决于所使用的特定缓存插件。 请注意,如果用户对事实和清单使用相同的缓存后端,则两者都会被刷新。 为了避免这种情况,用户可以在其清单插件配置中指定不同的缓存后端。

Common format for inventory sources( inventory sources使用格式)

      为了简化开发,大多数插件都使用基于YAML的标准配置文件作为清单资源。 该文件只有一个必填字段插件,其中应包含预计会消耗该文件的插件的名称。 根据所使用的其他常用功能,您可能需要其他字段,并且可以根据需要在每个插件中添加自定义选项。 例如,如果你使用集成的缓存,则可能存在cache_plugin,cache_timeout和其他与缓存相关的字段。

The ‘auto’ plugin

     从Ansible 2.5开始,包括auto inventory插件,并默认启用它。 如果标准配置文件中的插件字段与库存插件的名称匹配,则自动库存插件将加载你的插件。 “auto”插件使你无需更新配置即可更轻松地使用你的插件。

Inventory scripts

即使现在有了清单插件,仍然支持清单脚本,不仅为了向后兼容,而且还允许用户利用其他编程语言。

Inventory script conventions(Inventory script约定)

       Inventory scripts必须接受--list和--host <hostname>参数。 尽管允许使用其他参数,但Ansible不会使用它们。 这样的参数对于直接执行脚本可能仍然有用。

       当使用单个参数--list调用脚本时,脚本必须输出到stdout,其中包含要管理的所有组的JSON编码的哈希或字典。 每个组的值应该是包含每个主机列表,任何子组和潜在组变量的哈希或字典,或者仅是主机列表:

{
    "group001": {
        "hosts": ["host001", "host002"],
        "vars": {
            "var1": true
        },
        "children": ["group002"]
    },
    "group002": {
        "hosts": ["host003","host004"],
        "vars": {
            "var2": 500
        },
        "children":[]
    }}

      如果组中的任何元素为空,则可以从输出中将其省略。当使用参数--host <hostname>(其中<hostname>是上面的主机)进行调用时,脚本必须打印一个空的JSON哈希/字典或变量的哈希/字典,以使它们可用于模板和剧本 。 例如:

{
    "VAR001": "VALUE",
    "VAR002": "VALUE",}

打印变量是可选的。 如果脚本不打印变量,则应打印一个空的哈希或字典。

Tuning the external inventory script(调整外部inventory script)

      1.3版的新功能。上面提到的inventory script系统适用于所有版本的Ansible,但是为每个主机调用--host效率很低,尤其是在涉及对远程子系统的API调用的情况下。

      为了避免这种低效率,如果清单脚本返回称为“ _meta”的顶级元素,则可以在一次脚本执行中返回所有主机变量。 当此meta元素包含“ hostvars”的值时,将不会为每个主机使用--host调用清单脚本。 此行为导致大量主机的性能显着提高。

     要添加到顶级JSON字典中的数据如下所示:


{

    # 如上所述的清单脚本的结果转到此处
    # ...

    "_meta": {
        "hostvars": {
            "host001": {
                "var001" : "value"
            },
            "host002": {
                "var002": "value"
            }
        }
    }}

为了满足使用_meta的要求,要防止ansible用--host调用清单,必须至少使用空的hostvars字典填充_meta。 例如:

{

    # 如上所述的清单脚本的结果转到此处
    # ...

    "_meta": {
        "hostvars": {}
    }}

       如果你打算用inventory脚本替换现有的静态inventory文件,则它必须返回一个JSON对象,该对象包含一个“all”组,该组包含库存中的每个主机作为成员,库存中的每个组作为子对象。 它还应包括一个“ungrouped”组,其中包含不是任何其他组成员的所有主机。 此JSON对象的骨架示例是:

{
    "_meta": {
      "hostvars": {}
    },
    "all": {
      "children": [
        "ungrouped"
      ]
    },
    "ungrouped": {
      "children": [
      ]
    }}

一种简单方法是使用ansible-inventory,它也像清单脚本一样支持--list和--host参数。

二、Python调用Ansible API实例

#下面是参照网上的例子写的一些样例

# ansible --version

ansible 2.10.6
python version = 3.6.7

2.1 获取资产信息

获取组或主机:

# cat /tmp/ansible/test_hosts.txt   #先创建一个静态的资产列表

[test]
192.168.1.164 ansible_ssh_host=192.168.1.164 ansible_ssh_port=22 ansible_ssh_user=root ansible_ssh_pass='51niux.com'

[test:vars]
name=51niux

# cat /tmp/ansible/get_assets.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager

# InventoryManager类的调用方式
def InventoryManagerStudy():
    DL = DataLoader()
    # loader= 表示是用什么方式来读取文件  sources=就是资产文件列表,里面可以是相对路径也可以是绝对路径
    IM = InventoryManager(loader=DL, sources=["/tmp/ansible/test_hosts.txt"])

    # 获取指定资产文件中所有的组以及组里面的主机信息,返回的是字典,组名是键,主机列表是值
    allGroups = IM.get_groups_dict()
    print(allGroups)

    # 获取指定组的主机列表
    print(IM.get_groups_dict().get("test"))

    # 获取指定主机,这里返回的是host的实例
    host = IM.get_host("192.168.1.164")
    print(host)
    # 获取该主机所有变量
    print(host.get_vars())
    # 获取该主机所属的组
    print(host.get_groups())

def main():
    InventoryManagerStudy()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(e)
    finally:
        sys.exit()

# python get_assets.py

{'all': ['192.168.1.164'], 'ungrouped': [], 'test': ['192.168.1.164']}
['192.168.1.164']
192.168.1.164
{'inventory_file': '/tmp/ansible/test_hosts.txt', 'inventory_dir': '/tmp/ansible', 'ansible_ssh_host': '192.168.1.164', 'ansible_ssh_port': 22, 'ansible_ssh_user': 'root', 
'ansible_ssh_pass': '51niux.com', 'inventory_hostname': '192.168.1.164', 'inventory_hostname_short': '10', 'group_names': ['test']}
[test, all]

获取变量

# cat get_vars.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager

# VariableManager类的调用方式
def VariablManagerStudy():
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources=["/tmp/ansible/test_hosts.txt"])
    VM = VariableManager(loader=DL, inventory=IM)

    # 动态添加变量
    VM.set_host_variable(host="192.168.1.164", varname="env_type", value="online")
    # 必须要先获取主机,然后查询特定主机才能看到某个主机的变量
    host = IM.get_host("192.168.1.164")
    # 获取指定主机的变量
    print(VM.get_vars(host=host))

def main():
    VariablManagerStudy()

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(e)
    finally:
        sys.exit()

# python get_vars.py |grep env_type

image.png

博文来自:www.51niux.com

如何获取帮助使用说明

好了那么问题来了,我想了解其他的使用方法怎么了解?可以使用python的help。

# python

>>> import ansible

PACKAGE CONTENTS
    _vendor (package)
    cli (package)
    collections (package)
    compat (package)
    config (package)
    constants
    context
    errors (package)
    executor (package)
    galaxy (package)
    inventory (package)
    module_utils (package)
    modules (package)
    parsing (package)
    playbook (package)
    plugins (package)
    release
    template (package)
    utils (package)
    vars (package)

>>> from ansible.inventory.manager import InventoryManager

>>> help(ansible.inventory.manager.InventoryManager)

Help on class InventoryManager in module ansible.inventory.manager:

class InventoryManager(builtins.object)
 |  Creates and manages inventory
    ......

>>> from ansible.vars.manager import VariableManager

>>> help(ansible.vars.manager.VariableManager)

Help on class VariableManager in module ansible.vars.manager:

class VariableManager(builtins.object)
 |  Methods defined here:
 |
 |  __getstate__(self)
 ......

#查看其他模块使用的方式同样如此,这样除了网上找的例子,也可以自己进行帮助文档查阅,因为ansible的api经常有变化吗,如果出现调用又问题也可以查看相关的函数调用方式是不是发生了改变。

#当然也可以这样看,多看点:

>>> help(ansible.vars.manager)

2.2 执行adhoc

# yum install sshpass -y  #防止出现下面的错误

fatal: [192.168.1.164]: FAILED! => {"msg": "to use the 'ssh' connection type with passwords, you must install the sshpass program"}

#当然如果你按照网上找的那些例子可能会出现下面的报错:

fatal: [192.168.1.164]: FAILED! => {"msg": "the connection plugin '<class 'ansible.utils.sentinel.Sentinel'>' was not found"}

#是因为新的方式已经弃用了Options = namedtuple()而改为了context.CLIARGS = ImmutableDict()方式

# cat /etc/ansible/ansible.cfg

[defaults]
host_key_checking=False
command_warnings=False

#上面的两个参数第一个是为了防止读取known_hosts文件而出现下面的报错:

fatal: [192.168.1.164]: FAILED! => {"msg": "Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support this.  
Please add this host's fingerprint to your known_hosts file to manage this host."}

#上面的两个参数第二个是为了解决WARING提醒:

[WARNING]: Consider using the file module with state=touch rather than running 'touch'.  If you need to use command because file is insufficient you can add 'warn: false' to this command
task or set 'command_warnings=False' in ansible.cfg to get rid of this message.

#vim run_adhoc.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
#from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager
# 操作单个主机信息
from ansible.inventory.host import Host
# 操作单个主机组信息
from ansible.inventory.group import Group
# 存储执行hosts的角色信息
from ansible.playbook.play import Play
# ansible底层用到的任务队列
from ansible.executor.task_queue_manager import TaskQueueManager
# 核心类执行playbook
from ansible.executor.playbook_executor import PlaybookExecutor
# 新增下面两个调用
from ansible.module_utils.common.collections import ImmutableDict
from ansible import context


def adhoc():
    """
    ad-hoc 调用
    资产配置信息  这个是通过 InventoryManager和VariableManager 定义
    执行选项 这个是通过namedtuple来定义
    执行对象和模块 通过dict()来定义
    定义play 通过Play来定义
    最后通过 TaskQueueManager 的实例来执行play
    :return:
    """
    # 资产配置信息
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources="/tmp/ansible/test_hosts.txt")
    VM = VariableManager(loader=DL, inventory=IM)

    #在这里传递一些参数
    context.CLIARGS = ImmutableDict(connection='smart', module_path=None, verbosity=5,forks=10, become=None, become_method=None,become_user=None, check=False, diff=False,host_key_checking=False)
    # play的执行对象和模块,这里设置hosts,其实是因为play把play_source和资产信息关联后,执行的play的时候它会去资产信息中设置的sources的hosts文件中
    # 找你在play_source中设置的hosts是否在资产管理类里面。
    play_source = dict(name="Ansible Play",  # 任务名称
                       hosts="192.168.1.164",  # 目标主机,可以填写具体主机也可以是主机组名称
                       gather_facts="no",  # 是否收集配置信息

                       # tasks是具体执行的任务,列表形式,每个具体任务都是一个字典
                       tasks=[
                           dict(action=dict(module="shell", args="touch /tmp/hoc_test"))
                       ])
    print(play_source)
    # 定义play
    play = Play().load(play_source, variable_manager=VM, loader=DL)
    passwords = dict()  # 这个可以为空,因为在hosts文件中
    #可以看到options这个参数我已经注释掉了,因为已经不支持这个参数了
    TQM = TaskQueueManager(
              inventory=IM,
              variable_manager=VM,
              loader=DL,
              #options=options,
              passwords=passwords,
          )
    result = TQM.run(play)

def main():
    adhoc()

if __name__ == "__main__":
   main()

# python run_adhoc.py   #从下面的执行结果可以看到没有报错并且Play已经执行了

{'name': 'Ansible Play', 'hosts': '192.168.1.164', 'gather_facts': 'no', 'tasks': [{'action': {'module': 'shell', 'args': 'touch /tmp/hoc_test'}}]}

PLAY [Ansible Play] **************************************************************************************************************************************************************************

TASK [shell] *********************************************************************************************************************************************************************************
changed: [192.168.1.164]

#下面上执行的机器查看一下执行效果

# ls -l /tmp/hoc_test

-rw-r--r-- 1 root root 0 3月   9 16:40 /tmp/hoc_test

# cat /var/log/messages|grep "hoc_test"   #可以看到系统里面也是有执行这条命令的

Mar  9 16:40:48 192.168.1.164 ansible-ansible.legacy.command: Invoked with _raw_params=touch /tmp/hoc_test _uses_shell=True warn=False stdin_add_newline=True strip_empty_ends=True argv=None chdir=None executable=None creates=None removes=None stdin=None

#还是要活用python的help方式查看帮助,就算以后版本升级有什么变化也能及时排错

>>> from ansible.executor.task_queue_manager import TaskQueueManager

>>> help(TaskQueueManager)

Help on class TaskQueueManager in module ansible.executor.task_queue_manager:

class TaskQueueManager(builtins.object)
 |  This class handles the multiprocessing requirements of Ansible by
 |  creating a pool of worker forks, a result handler fork, and a
 |  manager object with shared datastructures/queues for coordinating
 |  work between all processes.
 |
 |  The queue manager is responsible for loading the play strategy plugin,
 |  which dispatches the Play's tasks to hosts.
 |
 |  Methods defined here:
 |
 |  __init__(self, inventory, variable_manager, loader, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False, forks=None)
 |      Initialize self.  See help(type(self)) for accurate signature.
 ......

2.3 执行playbook

# python run_playbook.py   #这就是原来旧的例子执行的话会有报错

Traceback (most recent call last):
  File "run_playbook.py", line 50, in <module>
    main()
  File "run_playbook.py", line 47, in main
    execplaybook()
  File "run_playbook.py", line 42, in execplaybook
    playbook.run()
  File "/usr/local/python3/lib/python3.6/site-packages/ansible/executor/playbook_executor.py", line 146, in run
    if context.CLIARGS['syntax']:
  File "/usr/local/python3/lib/python3.6/site-packages/ansible/module_utils/common/collections.py", line 20, in __getitem__
    return self._store[key]
KeyError: 'syntax'
#然后你解决了这个报错,还会有一个报错:
KeyError: 'start_at_task'

# vim /usr/local/python3/lib/python3.6/site-packages/ansible/executor/playbook_executor.py  #可以看下报错的地方

        if context.CLIARGS['syntax']:
            display.display("No issues encountered")
            return result

        if context.CLIARGS['start_at_task'] and not self._tqm._start_at_done:
            display.error(
                "No matching task \"%s\" found."
                " Note: --start-at-task can only follow static includes."
                % context.CLIARGS['start_at_task']
            )

        return result

#下面是完整的测试程序

# cat test.yml   #先编写一个最简单测试palybook

- hosts: all
  tasks:
   - name: "test echo"
     shell: echo 'haha No.1 PLAYBOOK' >>/tmp/hoc_test

# vim run_playbook.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
#from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager
# 操作单个主机信息
from ansible.inventory.host import Host
# 操作单个主机组信息
from ansible.inventory.group import Group
# 存储执行hosts的角色信息
from ansible.playbook.play import Play
# ansible底层用到的任务队列
from ansible.executor.task_queue_manager import TaskQueueManager
# 核心类执行playbook
from ansible.executor.playbook_executor import PlaybookExecutor
# 新增下面两个调用
from ansible.module_utils.common.collections import ImmutableDict
from ansible import context

def execplaybook():
    """
    调用playboo大致和调用ad-hoc相同,只是真正调用的是使用PlaybookExecutor
    """
    # 资产配置信息
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources="/tmp/ansible/test_hosts.txt")
    VM = VariableManager(loader=DL, inventory=IM)

    #执行选项,这个类不是ansible的类,这个的功能就是为了构造参数,特别注意在新版本多加了两个传参syntax=None,start_at_task=None
    context.CLIARGS = ImmutableDict(connection='smart', module_path=None, verbosity=5,forks=10, become=None, become_method=None,become_user=None, check=False, diff=False,host_key_checking=False,syntax=None,start_at_task=None)
    passwords = dict()  # 这个可以为空,因为在hosts文件中,而且一般也是秘钥登录

    # playbooks参数里面放的就是playbook文件路径
    playbook = PlaybookExecutor(playbooks=["test.yml"], inventory=IM, variable_manager=VM, loader=DL, passwords=passwords)
    playbook.run()

def main():
    execplaybook()

if __name__ == "__main__":
   main()

# python run_playbook.py

PLAY [all] ***********************************************************************************************************************************************************************************

TASK [Gathering Facts] ***********************************************************************************************************************************************************************
ok: [192.168.1.164]

TASK [test echo] *****************************************************************************************************************************************************************************
changed: [192.168.1.164]

PLAY RECAP ***********************************************************************************************************************************************************************************
192.168.1.164               : ok=2    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0

#在目标主机查看一下测试文件:

# cat /tmp/hoc_test

haha No.1 PLAYBOOK

2.3 带有callback的执行adhoc和playbook

执行adhoc

#上面看到输出结果都是ansible自带的输出方式,那么怎么把执行方式改为我们自定义的执行方式呢?那就用callback来设置一下执行结果的输出

# cat run_call_adhoc.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
#from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager
# 操作单个主机信息
from ansible.inventory.host import Host
# 操作单个主机组信息
from ansible.inventory.group import Group
# 存储执行hosts的角色信息
from ansible.playbook.play import Play
# ansible底层用到的任务队列
from ansible.executor.task_queue_manager import TaskQueueManager
# 核心类执行playbook
from ansible.executor.playbook_executor import PlaybookExecutor
# 状态回调,各种成功失败的状态
from ansible.plugins.callback import CallbackBase
# 新增下面两个调用
from ansible.module_utils.common.collections import ImmutableDict
from ansible import context

class ResultsCollectorJSONCallback(CallbackBase):
    """
    通过api调用ac-hoc的时候输出结果很多时候不是很明确或者说不是我们想要的结果,主要它还是输出到STDOUT,而且通常我们是在工程里面执行
    这时候就需要后台的结果前端可以解析,正常的API调用输出前端很难解析。 对比之前的执行 adhoc()查看区别。
    为了实现这个目的就需要重写CallbackBase类,需要重写下面三个方法
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)  # python3中重载父类构造方法的方式,在Python2中写法会有区别。
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        """
        重写 unreachable 状态
        :param result:  这是父类里面一个对象,这个对象可以获取执行任务信息
        """
        self.host_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        """
        重写 ok 状态
        :param result:
        """
        self.host_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, *args, **kwargs):
        """
        重写 failed 状态
        :param result:
        """
        self.host_failed[result._host.get_name()] = result


def adhoc():
    """
    ad-hoc 调用
    资产配置信息  这个是通过 InventoryManager和VariableManager 定义
    执行选项 这个是通过namedtuple来定义
    执行对象和模块 通过dict()来定义
    定义play 通过Play来定义
    最后通过 TaskQueueManager 的实例来执行play
    :return:
    """
    # 资产配置信息
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources="/tmp/ansible/test_hosts.txt")
    VM = VariableManager(loader=DL, inventory=IM)

    #在这里传递一些参数
    context.CLIARGS = ImmutableDict(connection='smart', module_path=None, verbosity=5,forks=10, become=None, become_method=None,become_user=None, check=False, diff=False,host_key_checking=False)
    # play的执行对象和模块,这里设置hosts,其实是因为play把play_source和资产信息关联后,执行的play的时候它会去资产信息中设置的sources的hosts文件中
    # 找你在play_source中设置的hosts是否在资产管理类里面。
    play_source = dict(name="Ansible Play",  # 任务名称
                       hosts="192.168.1.164",  # 目标主机,可以填写具体主机也可以是主机组名称
                       gather_facts="no",  # 是否收集配置信息

                       # tasks是具体执行的任务,列表形式,每个具体任务都是一个字典
                       tasks=[
                           dict(action=dict(module="shell", args="touch /tmp/hoc_test"))
                       ])
    # 定义play
    play = Play().load(play_source, variable_manager=VM, loader=DL)
    passwords = dict()  # 这个可以为空,因为在hosts文件中

    results_callback = ResultsCollectorJSONCallback()  # 实例化自定义callback

    TQM = TaskQueueManager(
              inventory=IM,
              variable_manager=VM,
              loader=DL,
              passwords=passwords,
              stdout_callback=results_callback
          )
    result = TQM.run(play)

    # 定义数据结构
    result_raw = {"success": {}, "failed": {}, "unreachable": {}}
    # 如果成功那么  mycallback.host_ok.items() 才可以遍历,上面的任务肯定能成功所以我们就直接遍历这个
    for host, result in results_callback.host_ok.items():
        result_raw["success"][host] = result._result

    for host, result in results_callback.host_failed.items():
        result_raw["failed"][host] = result._result

    for host, result in results_callback.host_unreachable.items():
        result_raw["unreachable"][host] = result._result

    print(result_raw)


def main():
    adhoc()

if __name__ == "__main__":
   main()

博文来自:www.51niux.com

# python run_call_adhoc.py

{'success': {'192.168.1.164': {'cmd': 'touch /tmp/hoc_test', 'stdout': '', 'stderr': '', 'rc': 0, 'start': '2021-03-09 17:33:35.342455', 'end': '2021-03-09 17:33:35.348890', 
'delta': '0:00:00.006435', 'changed': True, 'invocation': {'module_args': {'_raw_params': 'touch /tmp/hoc_test', '_uses_shell': True, 'warn': False, 'stdin_add_newline': True, 
'strip_empty_ends': True, 'argv': None, 'chdir': None, 'executable': None, 'creates': None, 'removes': None, 'stdin': None}}, 'stdout_lines': [], 'stderr_lines': [], 
'ansible_facts': {'discovered_interpreter_python': '/usr/bin/python'}, '_ansible_no_log': False}}, 'failed': {}, 'unreachable': {}}

执行playbook

# cat  run_call_playbook.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
#from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager
# 操作单个主机信息
from ansible.inventory.host import Host
# 操作单个主机组信息
from ansible.inventory.group import Group
# 存储执行hosts的角色信息
from ansible.playbook.play import Play
# ansible底层用到的任务队列
from ansible.executor.task_queue_manager import TaskQueueManager
# 核心类执行playbook
from ansible.executor.playbook_executor import PlaybookExecutor
# 状态回调,各种成功失败的状态
from ansible.plugins.callback import CallbackBase
# 新增下面两个调用
from ansible.module_utils.common.collections import ImmutableDict
from ansible import context

class ResultsCollectorJSONCallback(CallbackBase):
    """
    playbook的callback改写,格式化输出playbook执行结果
    """
    CALLBACK_VERSION = 2.0

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.task_ok = {}
        self.task_unreachable = {}
        self.task_failed = {}
        self.task_skipped = {}
        self.task_status = {}

    def v2_runner_on_unreachable(self, result):
        """
        重写 unreachable 状态
        :param result:  这是父类里面一个对象,这个对象可以获取执行任务信息
        """
        self.task_unreachable[result._host.get_name()] = result

    def v2_runner_on_ok(self, result, *args, **kwargs):
        """
        重写 ok 状态
        :param result:
        """
        self.task_ok[result._host.get_name()] = result

    def v2_runner_on_failed(self, result, *args, **kwargs):
        """
        重写 failed 状态
        :param result:
        """
        self.task_failed[result._host.get_name()] = result

    def v2_runner_on_skipped(self, result):
        self.task_skipped[result._host.get_name()] = result

    def v2_playbook_on_stats(self, stats):
        hosts = sorted(stats.processed.keys())
        for h in hosts:
            t = stats.summarize(h)
            self.task_status[h] = {
                "ok": t["ok"],
                "changed": t["changed"],
                "unreachable": t["unreachable"],
                "skipped": t["skipped"],
                "failed": t["failures"]   #注意这里是failures而不是failed,打印下print(t)就知道了
            }


def execplaybook():
    """
    调用playboo大致和调用ad-hoc相同,只是真正调用的是使用PlaybookExecutor
    """
    # 资产配置信息
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources="/tmp/ansible/test_hosts.txt")
    VM = VariableManager(loader=DL, inventory=IM)

    #执行选项,这个类不是ansible的类,这个的功能就是为了构造参数
    context.CLIARGS = ImmutableDict(connection='smart', module_path=None, verbosity=5,forks=10, become=None, become_method=None,become_user=None, check=False, diff=False,host_key_checking=False,syntax=None,start_at_task=None)
    passwords = dict()  # 这个可以为空,因为在hosts文件中,而且一般也是秘钥登录

    # playbooks参数里面放的就是playbook文件路径
    playbook = PlaybookExecutor(playbooks=["test.yml"], inventory=IM, variable_manager=VM, loader=DL, passwords=passwords)
    callback = ResultsCollectorJSONCallback()
    playbook._tqm._stdout_callback = callback  # 配置callback
    playbook.run()

    result_raw = {"ok": {}, "failed": {}, "unreachable": {}, "skipped": {}, "status": {}}
    for host, result in callback.task_ok.items():
        result_raw["ok"][host] = result._result

    for host, result in callback.task_failed.items():
        result_raw["failed"][host] = result._result

    for host, result in callback.task_unreachable.items():
        result_raw["unreachable"][host] = result._result

    for host, result in callback.task_skipped.items():
        result_raw["skipped"][host] = result._result
    #我先把这两句话注释掉了,不然会有报错,因为没有callback.task_status里面有status的key,打印下print(callback.task_status)  
        #for host, result in callback.task_status.items():
        #    result_raw["status"][host] = result._result        

    print(result_raw)

def main():
    execplaybook()

if __name__ == "__main__":
   main()

# python run_call_playbook.py

{'ok': {'192.168.1.164': {'cmd': "echo 'haha No.1 PLAYBOOK' >>/tmp/hoc_test", 'stdout': '', 'stderr': '', 'rc': 0, 'start': '2021-03-09 18:20:11.097504', 'end': '2021-03-09 18:20:11.102169', 
'delta': '0:00:00.004665', 'changed': True, 'invocation': {'module_args': {'_raw_params': "echo 'haha No.1 PLAYBOOK' >>/tmp/hoc_test", '_uses_shell': True, 'warn': False, 
'stdin_add_newline': True, 'strip_empty_ends': True, 'argv': None, 'chdir': None, 'executable': None, 'creates': None, 'removes': None, 'stdin': None}}, 
'stdout_lines': [], 'stderr_lines': [], '_ansible_no_log': False}}, 'failed': {}, 'unreachable': {}, 'skipped': {}, 'status': {}}

2.4 动态Inventory的使用方式

#动态Inventory的方式就一种直接通过程序获取json信息得到执行的主机列表。也可以试着每次运行ansible的时候生成一个临时hosts文件。

动态生成hosts文件

# cat tmp_hosts.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
通过操作文件形式动态生成ansible的hosts文件
"""

import sys


class Inventory:
    def __init__(self):
        # ansible的hosts文件路径
        self._hostsfile = "/tmp/hosts"
        self._data = self._getInventoryInfo()
        if self._genHostsFile():
            print("创建完成")
        else:
            print("创建失败")

    def _getInventoryInfo(self):
        """
        假设我们已经从数据库中获取资产信息,那么然后拼成下面的信息效果
        """
        hostdata = [
            {
                "Groupname": "TestGroup1",
                "Items": [
                    {
                        "name": "192.168.1.164",
                        "ansible_ssh_host": "192.168.1.164",
                        "ansible_ssh_port": "22",
                        "ansible_ssh_user": "root",
                        "ansible_python_interpreter": "/usr/local/python3"
                    },
                    {
                        "name": "192.168.1.165",
                        "ansible_ssh_host": "192.168.1.165",
                        "ansible_ssh_port": "22",
                        "ansible_ssh_user": "root",
                        "ansible_python_interpreter": "/usr/local/python3"
                    },
                ]
            },
        ]

        return hostdata

    def _genHostsFile(self):
        """
        生成资产hosts文件,生成成功返回True
        """
        try:
            with open(self._hostsfile, "w") as host_file:
                for hosts in self._data:
                    groupname = hosts.get("Groupname")
                    host_file.write("["+groupname+"]\n")
                    for server in hosts.get("Items"):
                        name = server.get("name")
                        ansible_ssh_host = server.get("ansible_ssh_host")
                        ansible_ssh_port = server.get("ansible_ssh_port")
                        ansible_ssh_user = server.get("ansible_ssh_user")
                        ansible_python_interpreter = server.get("ansible_python_interpreter")

                        info = "ansible_ssh_host={0} ansible_ssh_port={1} ansible_ssh_user={2} ansible_python_interpreter={3}".\
                            format(ansible_ssh_host, ansible_ssh_port, ansible_ssh_user, ansible_python_interpreter)
                        line = name + " " + info + "\n"
                        host_file.write(line)
        except Exception as err:
            print(err)
            return False
        return True

def main():
    Inventory()

if __name__ == "__main__":
    try:
        main()
    except Exception as err:
        print(err)
    finally:
        sys.exit()

# python tmp_hosts.py

创建完成

# cat /tmp/hosts   #查看生成的文件

[TestGroup1]
192.168.1.164 ansible_ssh_host=192.168.1.164 ansible_ssh_port=22 ansible_ssh_user=root ansible_python_interpreter=/usr/local/python3
192.168.1.165 ansible_ssh_host=192.168.1.165 ansible_ssh_port=22 ansible_ssh_user=root ansible_python_interpreter=/usr/local/python3

#这样就是两步走,先用程序调用接口啊库啊啥的经过数据处理产生临时的Inventory文件,然后再走另一个执行脚本调用这个Inventory文件。

博文来自:www.51niux.com

程序中直接获取主机并执行

#直接合一起吧,虽然你可以直接调用host去执行,但是很多时候我们需要通过extra_vars传递很多变量剧本中,这时候问题来了,如果通过set_host_variable只能传递一个变量,但是网上说的variable = VariableManager(loader=loader,inventory=inventory)然后variable.extra_vars=字典 是要报错的,因为我们查看帮助这里只是显示变量而非增加extra_vars变量。

#重点来了,下面就说明一下Python API 如果使用extra_vars来给剧本增加额外变量。

# cat run_playbook.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
#from collections import namedtuple
# 核心类
# 用于读取YAML和JSON格式的文件
from ansible.parsing.dataloader import DataLoader
# 用于存储各类变量信息
from ansible.vars.manager import VariableManager
# 用于导入资产文件
from ansible.inventory.manager import InventoryManager
# 操作单个主机信息
from ansible.inventory.host import Host
# 操作单个主机组信息
from ansible.inventory.group import Group
# 存储执行hosts的角色信息
from ansible.playbook.play import Play
# ansible底层用到的任务队列
from ansible.executor.task_queue_manager import TaskQueueManager
# 核心类执行playbook
from ansible.executor.playbook_executor import PlaybookExecutor
# 新增下面两个调用
from ansible.module_utils.common.collections import ImmutableDict
from ansible import context

def execplaybook():
    # 资产配置信息
    DL = DataLoader()
    IM = InventoryManager(loader=DL, sources='')
    IM.add_host(host='192.168.1.164',port=22,group='all')

    #执行选项,这个类不是ansible的类,这个的功能就是为了构造参数
    #重点就是extra_vars={"env_type='online'","host_type='web'"}通过这种方式可以实现extra_vars的增加
    context.CLIARGS = ImmutableDict(connection='smart', module_path=None, verbosity=5,forks=10, become=None, become_method=None,become_user=None, check=False, diff=False,host_key_checking=False,syntax=None,start_at_task=None,extra_vars={"env_type='online'","host_type='web'"})
    passwords = dict()  # 这个可以为空,因为在hosts文件中,而且一般也是秘钥登录
    VM = VariableManager(loader=DL, inventory=IM)   
    print(VM.get_vars())
    # playbooks参数里面放的就是playbook文件路径
    playbook = PlaybookExecutor(playbooks=["test.yml"], inventory=IM, variable_manager=VM, loader=DL, passwords=passwords)
    playbook.run()

def main():
    execplaybook()

if __name__ == "__main__":
   main()

#修改下playbook来进行变量的判断

# cat test.yml

- hosts: all
  tasks:
  - name: test echo
    shell: echo 'haha No.1 PLAYBOOK' >>/tmp/hoc_test
    when:
       - env_type == "online" and host_type == "web"

# python run_playbook.py

image.png

#首先可以看到我们定义的变量是可以读取到的,上面有个WARNING是因为我们没有定义主机文件有一个WARNING,如果想不产生WARNING报警也很简单,就是创建一个空文件,如:IM = InventoryManager(loader=DL, sources='/tmp/ansible/1.txt')

#剩下的输出结果就不贴了,已经执行了。如果变量没匹配上的话就直接跳过了

TASK [test echo] ****************************************************************************************
skipping: [192.168.1.164]

调用脚本动态生成并执行

上面是一次执行一台主机,如果我们想一次执行多台主机呢?除了上面的for循环的方式,下面这种方式也是可以考虑下的。

# cat Inventor_create.py  #先来一个动态脚本,理论上这里面应该是调用数据库啊或者其他api之类的生成一个主机列表的哈,这里从简了

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
"all": {
    "hosts": [
        "192.168.1.224","192.168.1.164","192.168.1.165"
    ]
   }
}
print(json.dumps(json_data,indent=4))

# chmod +x  Inventor_create.py

# python Inventor_create.py  #执行一下看下效果

{
    "all": {
        "hosts": [
            "192.168.1.224",
            "192.168.1.164",
            "192.168.1.165"
        ]
    }
}

# ansible -i Inventor_create.py all -m ping  #可以先这样执行一下,脚本先测好

# cat run_playbook.py  #这里也很简单,只需要将sources那地方修改为我们的动态脚本就可以了

IM = InventoryManager(loader=DL, sources="Inventor_create.py")

# python run_playbook.py

image.png

#从上图可以看到我们执行了三台主机,由于另外两台主机我没有配置秘钥登录所以是不通状态。

博文来自:www.51niux.com

三、Python其他方式调用playbook

#通过上面的例子我们也了解到了,当api的调用方式改变的时候,你就需要跟着改变调用方式,下面是两种不用改变的方式。

3.1 CLI方式

先看下ansible有哪些cli

# python

>>> import ansible

>>> from ansible import cli

>>> help(ansible.cli)

PACKAGE CONTENTS
    adhoc
    arguments (package)
    config
    console
    doc
    galaxy
    inventory
    playbook
    pull
    scripts (package)
    vault

#好其他的我们也不关注了,就关注一下

# cat  cli_playbook.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from ansible.cli.playbook import PlaybookCLI
from ansible.plugins.callback import CallbackBase
import json
from ansible.cli import CLI
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible import context
from ansible import constants as C


class ResultCallback(CallbackBase):
    """ 一个示例回调插件,用于在结果进入时执行操作如果要将所有结果收集到单个对象中以在以下位置进行处理执行结束后,研究利用json回调插件或编写自己的自定义回调插件
    """
    def v2_runner_on_ok(self, result, **kwargs):
        """打印结果的json表示此方法可以将结果存储在实例属性中,以供以后检索
        """
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))


    def v2_runner_on_failed(self, result, **kwargs):
      host = result._host.get_name()
      self.runner_on_failed(host, result._result, False)
      print('===v2_runner_on_failed====host=%s===result=%s'%(host,result._result))

    def v2_runner_on_unreachable(self, result):
      host = result._host.get_name()
      self.runner_on_unreachable(host, result._result)
      print('===v2_runner_on_unreachable====host=%s===result=%s'%(host,result._result))

    def v2_runner_on_skipped(self, result):
        if C.DISPLAY_SKIPPED_HOSTS:
         host = result._host.get_name()
         self.runner_on_skipped(host, self._get_item(getattr(result._result,'results',{})))
         print("this task does not execute,please check parameter or condition.")

    def v2_playbook_on_stats(self, stats):
      print('===========play executes end========')

cli = PlaybookCLI([" ",'-i', 'test_hosts.txt', 'test.yml','--extra-vars','env_type=online host_type=web'])
super(PlaybookCLI,cli).run()
loader, inventory, variable_manager = cli._play_prereqs()
CLI.get_host_list(inventory, context.CLIARGS['subset'])
PBEX = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
                                variable_manager=variable_manager, loader=loader,
                                passwords=None)
PBEX._tqm._stdout_callback = ResultCallback()
PBEX.run()

# python cli_playbook.py   #下面是执行结果可以看到已经playbook也执行了

image.png

3.2 subprocess.Popen调用方式

#我之所以用subprocess.Popen而不是用os.system是因为我要拿返回信息

#下面的例子你只是代码中的一段就当简单说明一下

# cat /root/init/inventory/ecs.py  #先编辑一个获取主机列表的动态脚本

#!/usr/bin/python
# -*- coding: utf-8 -*-
import json
import os
import sys

from core.redis.info import Redis_Manager
group = "all"
InstanceId_list = Redis_Manager().name_list('init_list')
if len(InstanceId_list) > 0:
    init_list = list()
    for InstanceId in InstanceId_list:
        ecs_info_dict = eval(Redis_Manager().select_hash_dn(key='init_hash',dn=InstanceId))
        ecs_ip = ecs_info_dict['PrimaryIpAddress']
        init_list.append(ecs_ip)
hostdata = {group:{"hosts":init_list}}
print (json.dumps(hostdata,indent=4))

#执行出来的内容格式大概是下面的方式:

{
    "all": {
        "hosts": [
            "192.168.1.224"
        ]
    }
}

# cat /root/init/core/aliyun.py  #下面只是执行ansible命令的一部分

init_cmd='ansible-playbook -i %s /etc/ansible/playbooks/roles/k8s_init.yml --extra-vars "ecs_host=%s env_type=%s HostMetadata=%s node_type=%s"' % (base_path+'/init/inventory/'+"ecs.py",ecs_ip,env_type,HostMetadata,node_type)
s_out = subprocess.Popen(init_cmd,shell=True,stdout=subprocess.PIPE)
sout = s_out.communicate()
#从对应的redis列表中删除
Redis_Manager().delete_list_value('init_list',ecs_ip)
print(re.sub(r"\*", "",str(sout)))
if str(sout).find('failed=0') > 0:
    #调用钉钉通知进行信息通知
    status = 'OK'
    message = '机器系统初始化成功'
else:
     #否则就是初始化失败了,对失败信息进行过滤优化其输出格式
     init_faild_info = re.sub(r"\*", "",str(sout))
     #如果输出的信息长度过大对其进行信息截取以防止其超出钉钉通知限制不能报警出来
     if len(sout) > 5000:
        init_faild_info = init_faild_info[-3000:-1]
        #对错误信息进行钉钉通知
        status = 'Faild'
        message = init_faild_info

#上面的方式是用python调用ansible-playbook,这样的好处是不管ansible版本怎么更新你也不用更新你的调用代码,也是比较好维护的,当然如果你想并发的执行的话,可以采用多线程的方式,不好的就是ansible的很多特性用不了,比如消息格式的处理就只能处理获取的stout字符串了。

作者:忙碌的柴少 分类:ansible系列 浏览:258 评论:0
留言列表
发表评论
来宾的头像