#coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板
#------------------------------
# API-Demo of Python
#------------------------------
import time,hashlib,sys,os,json
from bt_config import BT
class Bt_api:
__BT_KEY = BT.BT_KEY
__BT_PANEL = BT.BT_URL
#如果希望多台面板,可以在实例化对象时,将面板地址与密钥传入
def __init__(self,bt_panel = None,bt_key = None):
if bt_panel:
self.__BT_PANEL = bt_panel
self.__BT_KEY = bt_key
#取面板日志
def get_logs(self):
#拼接URL地址
url = self.__BT_PANEL + '/data?action=getData'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['table'] = 'logs'
p_data['limit'] = 10
p_data['tojs'] = 'test'
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#查看文件列表
def GetDir(self,path):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=GetDir'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['path'] = path
#请求面板接口
result = self.__http_post_cookie(url,p_data)
r_data = json.loads(result)
DIR=r_data['DIR']
FILES=r_data['FILES']
filename_list=[]
filename_DIR=[]
print(filename_DIR)
zip=['zip','rar','.gz']
img=['png','jpg']
video=['mp4']
for FILES_Name in FILES:
x = FILES_Name.split(';')
if x[0][-3:] in zip:
timeStamp = int(x[2])
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
if int(x[1])<1024:
filename_list.append(
{
"category":3,
"fifename":x[0],
"path_size":x[1]+'B',
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
},
)
elif int(x[1])>1024:
filename_list.append(
{
"category":3,
"fifename":x[0],
"path_size":int(x[1])/1024,
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
},
)
elif x[0][-3:] in img:
timeStamp = int(x[2])
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
filename_list.append(
{
"category":4,
"fifename":x[0],
"path_size":x[1],
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
})
elif x[0][-3:] in video:
timeStamp = int(x[2])
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
filename_list.append(
{
"category":5,
"fifename":x[0],
"path_size":x[1],
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
},
)
else:
timeStamp = int(x[2])
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
filename_list.append(
{
"category":2,
"fifename":x[0],
"path_size":x[1],
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
},
)
for FILES_Name in DIR:
x = FILES_Name.split(';')
timeStamp = int(x[2])
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
filename_DIR.append(
{
"category":1,
"fifename":x[0],
"permissions":x[3],
"Owner":x[4],
"time":otherStyleTime,
},
)
return filename_DIR+filename_list
#查看文件内容
def GetFileBody(self,path):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=GetFileBody'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['path'] = path
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#保存文件
def SaveFileBody(self,data,path):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=SaveFileBody'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['data'] = data
p_data['encoding'] = 'utf-8'
p_data['path'] = path
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#删除文件
def DeleteFile(self,path):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=DeleteFile'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['path'] = path
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#上传文件
def upload(self,path,f_name,blob):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=upload'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['f_path'] = path
p_data['f_name'] = f_name
p_data['f_size'] = 99999999999999999
p_data['f_start'] = 0
p_data['blob'] = blob
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
print(result)
#解析JSON数据
return json.loads(result)
#绑定域名
def AddDomain(self,domain,webname,id_id):
#拼接URL地址
url = self.__BT_PANEL + '/site?action=AddDomain'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['domain'] = domain
p_data['webname'] = webname
p_data['id'] = id_id
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
print(result)
#解析JSON数据
return json.loads(result)
#获取子目录
def GetDirBinding(self,id_id):
#拼接URL地址
url = self.__BT_PANEL + '/site?action=GetDirBinding'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['id'] = id_id
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#绑定子目录域名
def AddDirBinding(self,id_id,domain,dirName):
#拼接URL地址
url = self.__BT_PANEL + '/site?action=AddDirBinding'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['id'] = id_id
p_data['domain'] = domain
p_data['dirName'] = dirName
# p_data['st_mtime'] = int(time.time())
#请求面板接口
result = self.__http_post_cookie(url,p_data)
print(result)
#解析JSON数据
return json.loads(result)
#获取伪静态目录
def GetDirBinding(self,id_id):
#拼接URL地址
url = self.__BT_PANEL + '/site?action=GetDirBinding'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['id'] = id_id
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#重命名文件
def MvFile(self,sfile,dfile):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=MvFile'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['sfile'] = sfile
p_data['dfile'] = dfile
p_data['rename'] = 'true'
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
#压缩文件
def Zip(self,sfile,dfile,z_type,path):
#拼接URL地址
url = self.__BT_PANEL + '/files?action=Zip'
#准备POST数据
p_data = self.__get_key_data() #取签名
p_data['sfile'] = sfile
p_data['dfile'] = dfile
p_data['z_type'] = z_type
p_data['path'] = path
#请求面板接口
result = self.__http_post_cookie(url,p_data)
#解析JSON数据
return json.loads(result)
# #创建网站
# def create_site(self):
# #拼接URL地址
# url = self.__BT_PANEL + '/site?action=AddSite'
# #准备POST数据
# p_data = self.__get_key_data() #取签名
# p_data['webname'] = '{"domain":"1.com","domainlist":[],"count":0}'
# p_data['ps'] = "1.com"
# p_data['path'] = '/www/wwwroot/1_com'
# p_data['ftp'] = "true"
# p_data['ftp_username'] = "ftp_1_com"
# p_data['ftp_password'] = "4fxFYbSTbksYMhPE"
# p_data['sql'] = "true"
# p_data['codeing'] = "utf8"
# p_data['datauser'] = "sql_1_com"
# p_data['datapassword'] = "ThHZjXRpyKEH6dA8"
# p_data['type'] = "PHP"
# p_data['version'] = "72"
# p_data['type_id'] = "0"
# p_data['port'] = "80"
# #请求面板接口
# result = self.__http_post_cookie(url,p_data)
# #解析JSON数据
# return json.loads(result)
#计算MD5
def __get_md5(self,s):
m = hashlib.md5()
m.update(s.encode('utf-8'))
return m.hexdigest()
#构造带有签名的关联数组
def __get_key_data(self):
now_time = int(time.time())
p_data = {
'request_token':self.__get_md5(str(now_time) + '' + self.__get_md5(self.__BT_KEY)),
'request_time':now_time
}
return p_data
#发送POST请求并保存Cookie
#@url 被请求的URL地址(必需)
#@data POST参数,可以是字符串或字典(必需)
#@timeout 超时时间默认1800秒
#return string
def __http_post_cookie(self,url,p_data,timeout=1800):
cookie_file = './' + self.__get_md5(self.__BT_PANEL) + '.cookie';
#Python3
import urllib.request,ssl,http.cookiejar
cookie_obj = http.cookiejar.MozillaCookieJar(cookie_file)
if os.path.exists(cookie_file):
cookie_obj.load(cookie_file,ignore_discard=True,ignore_expires=True)
handler = urllib.request.HTTPCookieProcessor(cookie_obj)
data = urllib.parse.urlencode(p_data).encode('utf-8')
req = urllib.request.Request(url, data)
opener = urllib.request.build_opener(handler)
response = opener.open(req,timeout = timeout)
cookie_obj.save(ignore_discard=True, ignore_expires=True)
result = response.read()
if type(result) == bytes: result = result.decode('utf-8')
return result
# if __name__ == '__main__':
# #实例化宝塔API对象
# my_api = bt_api()
# #调用get_logs方法
# r_data = my_api.GetDir()
# #打印响应数据
# print(r_data)
© 版权声明
THE END
暂无评论内容