1.适用版本适用于RPA2020.4及以上版本,2.接口API手册调用方法和字段请参考论坛手册:它还没有在论坛上公开。请从当地制造商处获得。3.Python调用代码如果你想使用机器人来调
适用于RPA2020.4及以上版本,
2.接口API手册调用方法和字段请参考论坛手册:
它还没有在论坛上公开。请从当地制造商处获得。
3.Python调用代码如果你想使用机器人来调整API接口,请参考下面的代码。其他语言的逻辑是一样的:
import jsonimport timeimport requestsfrom urllib import parseimport hmacimport base64from hashlib import sha256def rpa_rest_2020_4(host='',rest_type='',data_json=None,mode='',port=443,accesstoken=None,retry=2):????'''????host:地址,str型,示例:'https://192.168.202.11'????rest_type:str型,接口类型,示例:'/oapi/v1/job/create'????data_json:字典型,发送的报文数据json格式????mode:接口请求方式(get、post、delete及put)????port:int型,https端口????retry:int型,重试次数????返回值:get_field_json:字典型,????'''????get_field_json={'code': 40, 'msg': 'fail,意外情况','result':None}????#url参数转换????def json2Params(data_json):????????get_field_json={'code': 41, 'msg': 'fail,转换URL参数失败!','result':None}????????try:????????????data_json=json.loads(data_json)????????????url_str = ''????????????nums = 0????????????max_nums = len(data_json)????????????for key in data_json:????????????????nums += 1????????????????if nums == max_nums:????????????????????url_str += str(key) + '=' + str(data_json[key])????????????????else:????????????????????????????????url_str += str(key) + '=' + str(data_json[key]) + '&'????????except Exception as e:????????????print('参数转化失败:',e)????????????url_str=''????????finally:????????????return url_str????if mode != 'get' and mode != 'put' and mode != 'post' and mode != 'delete':????????get_field_json={'code': 42, 'msg': 'fail,mode错误,只能为get、put、post或delete!','result':None}????????return get_field_json????#获得签名sign????sign = ''????sign_yc = ''????url = host????if port != 443:????????url = url + ':' + str(port)????url += rest_type????timestamp = None????#获取签名值sign:????try:????????get_field_json={'code': 41, 'msg': 'fail,获取签名失败!','result':None}????????#获得毫秒级时间戳(时间出入不能大于10分钟)????????timestamp=str(int(round(time.time() * 1000)))????????#处理json数据不为str类型的情况????????for key in data_json:????????????data_json[key]=str(data_json[key])????????#json排序????????data_json=json.dumps(data_json,sort_keys=True)????????#构造源串sign_yc????????if mode == 'get':????????????json_str=json2Params(data_json)????????????sign_yc=json_str + '&token=' + accesstoken + '×tamp=' + timestamp????????????url = url+'?'+json_str????????else:????????????sign_yc=str(data_json) + '&token=' + accesstoken + '×tamp=' + timestamp????????print('源串:',sign_yc)????????print('URL:',url)????????#URL编码????????url_bm = parse.quote(sign_yc, encoding="utf-8")????????#sha256加密密码????????byte_key = bytes(accesstoken, encoding="utf-8")????????byte_url_bm = bytes(url_bm, encoding="utf-8")????????hn256 = hmac.new(byte_key, byte_url_bm, digestmod=sha256)????????hh256 = hn256.hexdigest()????????#Base64编码????????bb64 = base64.b64encode(bytes(hh256, encoding="utf-8"))????????#获取sign????????sign = str(bb64, "utf-8")????????#替换rn????????sign=sign.replace('rn', '')????????print('签名值sign:',sign)????except Exception as e:????????print(e)????????return get_field_json????#开始尝试发送API????print('开始尝试发送API')????for count in range(retry):????????print('尝试第',count+1,'次API请求任务。')????????try:????????????print('开始尝试:',mode)????????????#post、delete、put????????????if mode!='get':????????????????header_dict = {'Content-Type': 'application/json;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}????????????????if mode=='post':????????????????????if rest_type == '/oapi/v1/job/create':????????????????????????header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}????????????????????????json_str=json2Params(data_json)????????????????????????url = url+'?'+json_str????????????????????????res = requests.post(url, data=str(data_json), headers=header_dict,verify=False)????????????????????else:????????????????????????json_str=json2Params(data_json)????????????????????????url = url+'?'+json_str????????????????????????header_dict = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}????????????????????????res = requests.post(url, data=str(data_json), headers=header_dict,verify=False)????????????????if mode=='put':????????????????????res = requests.put(url, data=str(data_json), headers=header_dict,verify=False)????????????????if mode=='delete':????????????????????res = requests.delete(url, data=str(data_json), headers=header_dict,verify=False)????????????if mode=='get':????????????????header_dict = {'accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}????????????????res = requests.get(url,headers=header_dict,verify=False)????????????#获取返回值????????????res_text=res.text????????????#转化成json????????????get_field_json=json.loads(res_text)????????????print('请求成功!')????????????break????????except Exception as e:????????????print('请求失败:',e)????????????get_field_json={'code': 40, 'msg': 'fail,发送API失败!','result':None}????return get_field_jsondef get_token(host='',port='',accessKey='',secretKey='',retry=2):????'''????host:地址,str型,示例:'https://192.168.202.11'????port:int型,https端口????accessKey:str型,服务平台应用AppKey????secretKey:str型,服务平台应用AppSecret????retry:int型,重试次数????'''????get_field_json={'code': 44, 'msg': 'fail,获取token失败','result':None}????for i in range(retry):????????try:????????????url = host????????????if port != 443:????????????????url = url + ':' + str(port)????????????json_str='accessKey='+accessKey+'&secretKey='+secretKey????????????url = url+'/oapi/v1/token?'+json_str????????????res = requests.get(url,verify=False)????????????res_text=res.text????????????#转化成json????????????get_field_json=json.loads(res_text)????????????print('获取token,第'+str(i+1)+'次,成功')????????????break????????except Exception as e:????????????print('获取token,第'+str(i+1)+'次,失败',e)????????????time.sleep(1)????return get_field_json????????def refresh_token(host='',port='',refleshtoken='',retry=2):????'''????host:地址,str型,示例:'https://192.168.202.11'????port:int型,https端口????refleshtoken:str型,刷新token????retry:int型,重试次数????'''????get_field_json={'code': 44, 'msg': 'fail,刷新token失败','result':None}????for i in range(retry):????????try:????????????url = host????????????if port != 443:????????????????url = url + ':' + str(port)????????????json_str='refreshToken='+refleshtoken????????????url = url+'/oapi/v1/token?'+json_str????????????res = requests.get(url,verify=False)????????????res_text=res.text????????????#转化成json????????????get_field_json=json.loads(res_text)????????????print('刷新token,第'+str(i+1)+'次,成功')????????????break????????except Exception as e:????????????print('刷新token,第'+str(i+1)+'次,失败',e)????????????time.sleep(1)????return get_field_json
4.来自其他平台或客户端的调用4.1其他平台调用根据第四章的逻辑,编写自己的调用代码。
4.2机器人呼叫根据第4章添加一个全局函数,在需要调用的地方使用全局函数控件。
注意:您需要提前获得令牌并调用它。如果没有第三方平台对接,获得的令牌可以存储在一个共享变量中。分配调用它的权限。请参考下图: