本文尝试基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据。
一、用文心一言AI大模型将需求转化为样例代码
1、第一次对话:“python3写一段从rest服务器获取数据的样例代码”
同时生成了以下注解
这段代码首先定义了一个函数
get_data_from_rest_server
,该函数使用requests.get
方法向指定的 URL 发送 GET 请求。如果请求成功(即响应的状态码为 200),则将响应的 JSON 数据返回。否则,它将打印出错误消息并返回 None。然后,我们调用这个函数来获取数据,并打印出获取到的数据。注意你需要将url
替换为你的 REST 服务器的实际 URL。
2、第二次对话:“以上请求需带入服务器的认证参数”
同时生成了以下注解
在上述代码中,我们通过在
requests.get
方法中传递auth=(username, password)
来提供服务器的认证参数。这样,当你发送请求时,requests
会自动将认证参数添加到请求中,从而完成身份验证过程。
3、第三次对话:“以上代码需代入请求消息头”
import requests
# 定义服务器认证参数的用户名和密码
username = 'your_username'
password = 'your_password'
def get_data_from_rest_server():
# 将此 URL 替换为你的 REST 服务器 URL
url = 'http://example.com/api/data'
# 定义请求头,可以包含自定义的头部字段和值
headers = {
'Content-Type': 'application/json', # 声明请求的内容类型为 JSON
'Authorization': f'Basic {username}:{password}', # 添加基本的身份验证头部字段
}
# 使用 GET 方法请求数据,并携带服务器认证参数和请求头
response = requests.get(url, headers=headers)
# 检查响应的状态码,如果请求成功(状态码为 200),则处理响应的数据
if response.status_code == 200:
data = response.json()
return data
else:
print(f"请求失败,状态码: {response.status_code}")
return None
# 调用函数获取数据
data = get_data_from_rest_server()
if data is not None:
print(data)
注解如下 :
在上述代码中,我们通过创建一个字典对象来定义请求头,其中包含了一个
Content-Type
字段和一个Authorization
字段。Content-Type
字段声明了请求的内容类型为 JSON,而Authorization
字段包含了基本的身份验证值。然后,我们将这个字典对象传递给requests.get
方法的headers
参数,从而将请求头添加到请求中。
通过和文心一言AI大模型的三次对话,我们大致了解了用python 的 requests
库从 REST 服务器获取数据的基本方法。
二、核查华为分布式块存储REST接口规范
1、RESTful API的请求必须包括如下消息头
2、拟测试请求接口官方样例
三、代码实写
#!/usr/bin/python3
#coding=utf-8
##__author__='daijianbing'
import requests
import json
def authenticate(username, password,url_auth):
# 构建认证数据
auth_data = {
'user_name': username,
'password': password
}
# 将认证数据转换为JSON格式
auth_json = json.dumps(auth_data)
#禁止未经验证的HTTPS
requests.packages.urllib3.disable_warnings()
# 发送POST请求到REST API进行身份验证
response = requests.post(url_auth,verify = False, data=auth_json, headers={'Content-Type': 'application/json'})
# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
if response.status_code == 200:
#print(response.json())
return response.json()['data']['x_auth_token']
else:
connect = 0
raise Exception('Authentication failed')
def queryStoragePool(token,url):
# 构建认证数据
headers = {
'Connection': 'keep-alive',
'X-Auth-Token': token
}
#print(headers)
# 发送请求到REST API
response = requests.get(url, verify = False, headers=headers)
# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
if response.status_code == 200:
#print(response.json())
return response.json()['storagePools']
else:
raise Exception('Authentication failed')
def queryPoolstatus(token,url,poolId):
# 构建认证数据
headers = {
'Connection': 'keep-alive',
'X-Auth-Token': token
}
params = {
'poolId':str(poolId),
'vbs': 'true'
}
params_json = json.dumps(params)
#print("url:",url,"headers:",headers,"params_json:",params_json)
# 发送T请求到REST API
response = requests.get(url, verify = False, params=params, headers=headers)
# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
if response.status_code == 200:
#print(response.json())
return response.json()['status']
else:
raise Exception('Authentication failed')
def queryAlarms(token,url):
# 构建认证数据
headers = {
'Connection': 'keep-alive',
'X-Auth-Token': token
}
params = {
'filter':"alarmStatus::1",
}
params_json = json.dumps(params)
# 发送请求到REST API
response = requests.get(url, verify = False, params=params, headers=headers)
# 检查响应状态码,如果成功则返回认证令牌,否则抛出异常
if response.status_code == 200:
#print(response.json())
return response.json()['data']
else:
raise Exception('Authentication failed')
# 使用示例
username = 'zabbix'
password = 'passwd'
hostip = '192.168.0.1'
port = '8088'
hostport = "https://"+hostip+":"+port
url_auth = hostport+'/api/v2/aa/sessions'
checkstatus = 1
requests.packages.urllib3.disable_warnings()
try:
token = authenticate(username, password, url_auth)
print('x_auth_token:', token)
url_pool = hostport+'/dsware/service/resource/queryStoragePool'
pools = queryStoragePool(token,url_pool)
url_poolstatus=hostport+'/dsware/service/cluster/storagepool/queryStatus'
for pool in pools:
poolstatus=queryPoolstatus(token,url_poolstatus,pool['poolId'])
url_alarms = hostport+'/api/v2/common/alarm_count'
Alarms=queryAlarms(token,url_alarms)
print('poolid:',pool['poolId'],'poolname:',pool['poolName'],'poolstatus:',poolstatus,'alarmcount:',Alarms['count'])
except:
checkstatus = 0
finally:
print("checkstatus:",checkstatus)
print("chech over")
四、验证运行
实际运行:文章来源:https://www.toymoban.com/news/detail-774466.html
# python3 --version
Python 3.9.9
# python3 healthck.py
x_auth_token: MzMwMDE0OTkyNkEzMDAxNGUwOTcwMzE2
poolid: 0 poolname: Pool01 poolstatus: 0 alarmcount: 0
checkstatus: 1
chech over
本段代码实现了存储REST接口认证登录,查询全部的pool并列出poolid,poolname,pool状态,输出当前未恢复告警数并返回本次的检查状态(1为检查成功)。文章来源地址https://www.toymoban.com/news/detail-774466.html
到了这里,关于基于文心一言AI大模型,编写一段python3程序以获取华为分布式块存储REST接口的实时数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!