本文共 3289 字,大约阅读时间需要 10 分钟。
https://github.com/hequan2017/zabbix-models/tree/master/ansible_run
只是简单改了一下 能够单独使用。
目录结构
ansible_run/├── callback.py├── exceptions.py├── __init__.py├── inventory.py├── runner.py├── test_inventory.py└── test_runner.py
下面两个是演示文件
先 pip3 install ansible==2.4.2.0 安装
例子
# -*- coding: utf-8 -*-from runner import AdHocRunner, CommandRunnerfrom inventory import BaseInventorydef TestAdHocRunner(): """ 以yml的形式 执行多个命令 :return: """ host_data = [ { "hostname": "testserver", "ip": "192.168.10.93", "port": 22, "username": "root", "password": "password", }, ] inventory = BaseInventory(host_data) runner = AdHocRunner(inventory) tasks = [ { "action": { "module": "cron","args": "name=\"sync time\" minute=\"*/3\" job=\"/usr/sbin/ntpdate time.nist.gov &> /dev/null\"" }, "name": "run_cmd"}, { "action": { "module": "shell", "args": "whoami"}, "name": "run_whoami"}, ] ret = runner.run(tasks, "all") print(ret.results_summary) print(ret.results_raw)def TestCommandRunner(): """ 执行单个命令,返回结果 :return: """ host_data = [ { "hostname": "testserver", "ip": "192.168.10.93", "port": 22, "username": "root", "password": "password", }, ] inventory = BaseInventory(host_data) runner = CommandRunner(inventory) res = runner.execute('pwd', 'all') print(res.results_command) print(res.results_raw) print(res.results_command['testserver']['stdout'])if __name__ == "__main__": TestAdHocRunner() TestCommandRunner()
from inventory import BaseInventorydef Test(): """ 返回主机信息,组信息,组内主机信息 :return: """ host_list = [{ "hostname": "testserver1", "ip": "102.1.1.1", "port": 22, "username": "root", "password": "password", "private_key": "/tmp/private_key", "become": { "method": "sudo", "user": "root", "pass": None, }, "groups": ["group1", "group2"], "vars": { "sexy": "yes"}, }, { "hostname": "testserver2", "ip": "8.8.8.8", "port": 2222, "username": "root", "password": "password", "private_key": "/tmp/private_key", "become": { "method": "su", "user": "root", "pass": "123", }, "groups": ["group3", "group4"], "vars": { "love": "yes"}, }] inventory = BaseInventory(host_list=host_list) print("#"*10 + "Hosts" + "#"*10) for host in inventory.hosts: print(host) print("#" * 10 + "Groups" + "#" * 10) for group in inventory.groups: print(group) print("#" * 10 + "all group hosts" + "#" * 10) group = inventory.get_group('all') print(group.hosts)if __name__ == '__main__': Test()