通知设置 新通知
python解析windows日志文件,查询服务器是否被人攻击
李魔佛 发表了文章 • 0 个评论 • 80 次浏览 • 2021-01-17 23:49
最终效果:
【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
大致代码如下:import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re
class WindowsLogger():
def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'
def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh
return None
def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))
# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]
for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data
if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data
if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data
if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address
yield IpAddress,ip,targetUsername,ProcessName
def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)
if __name__ == '__main__':
main()
D:\share\1.evtx 为日志导出文件
原创文章,转载请注明出处:
http://30daydo.com/article/44130
完整代码,可以通过公众号回复: windows日志解析获取
查看全部
最终效果:

【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
大致代码如下:
import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re
class WindowsLogger():
def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'
def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh
return None
def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))
# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]
for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data
if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data
if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data
if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address
yield IpAddress,ip,targetUsername,ProcessName
def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)
if __name__ == '__main__':
main()
D:\share\1.evtx 为日志导出文件
原创文章,转载请注明出处:
http://30daydo.com/article/44130
完整代码,可以通过公众号回复: windows日志解析获取
茅台抢购程序 京东 苏宁
李魔佛 发表了文章 • 0 个评论 • 1494 次浏览 • 2021-01-05 22:34
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.pyimport sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.pyimport random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析
查看全部
运行环境 windows,linux,mac,python3+
京东小白分查询:
https://plus.m.jd.com/rights/windControl
分太低的就不要参与了,毕竟概率会小很多
############ 2021-01-13 更新 ======
最新的用Go重写的,搞了几瓶
苏宁家的:
============= 2021-01-11 更新 ============
感觉苏宁的抢购是耍猴的,那个按钮基本处于不可点状态,所以就放弃了,感觉官方就是没放多少量,加上苏宁公司过往的尿性,所以洗洗睡了
main.py
import sys
from maotai.jd_spider_requests import ProdectPurchase
if __name__ == '__main__':
tip = """
功能列表:
1.预约商品
2.秒杀抢购商品
"""
print(tip)
product = ProdectPurchase()
choice_function = input('请选择:')
if choice_function == '1':
product.reserve()
elif choice_function == '2':
product.seckill_by_proc_pool()
else:
print('没有此功能')
sys.exit(1)
jd_spider_requests.py
import random
import time
import requests
import functools
import json
import os
import pickle
from lxml import etree
from error.exception import SKException
from maotai.jd_logger import logger
from maotai.timer import Timer
from maotai.config import global_config
from concurrent.futures import ProcessPoolExecutor
from helper.jd_helper import (
parse_json,
send_wechat,
wait_some_time,
response_status,
save_image,
open_image
)
class SpiderSession:
"""
Session相关操作
"""
def __init__(self):
self.cookies_dir_path = "./cookies/"
self.user_agent = global_config.getRaw('config', 'DEFAULT_USER_AGENT')
self.session = self._init_session()
def _init_session(self):
session = requests.session()
session.headers = self.get_headers()
return session
def get_headers(self):
return {"User-Agent": self.user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;"
"q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;"
"v=b3",
"Connection": "keep-alive"}
def get_user_agent(self):
return self.user_agent
def get_session(self):
"""
获取当前Session
:return:
"""
return self.session
def get_cookies(self):
"""
获取当前Cookies
:return:
"""
return self.get_session().cookies
def set_cookies(self, cookies):
self.session.cookies.update(cookies)
def load_cookies_from_local(self):
"""
从本地加载Cookie
:return:
"""
cookies_file = ''
if not os.path.exists(self.cookies_dir_path):
return False
for name in os.listdir(self.cookies_dir_path):
if name.endswith(".cookies"):
cookies_file = '{}{}'.format(self.cookies_dir_path, name)
break
if cookies_file == '':
return False
with open(cookies_file, 'rb') as f:
local_cookies = pickle.load(f)
self.set_cookies(local_cookies)
def save_cookies_to_local(self, cookie_file_name):
"""
保存Cookie到本地
:param cookie_file_name: 存放Cookie的文件名称
:return:
"""
cookies_file = '{}{}.cookies'.format(self.cookies_dir_path, cookie_file_name)
directory = os.path.dirname(cookies_file)
if not os.path.exists(directory):
os.makedirs(directory)
with open(cookies_file, 'wb') as f:
pickle.dump(self.get_cookies(), f)
class QrLogin:
"""
扫码登录
"""
def __init__(self, spider_session: SpiderSession):
"""
初始化扫码登录
大致流程:
1、访问登录二维码页面,获取Token
2、使用Token获取票据
3、校验票据
:param spider_session:
"""
self.qrcode_img_file = 'qr_code.png'
self.spider_session = spider_session
self.session = self.spider_session.get_session()
self.is_login = False
self.refresh_login_status()
def refresh_login_status(self):
"""
刷新是否登录状态
:return:
"""
self.is_login = self._validate_cookies()
def _validate_cookies(self):
"""
验证cookies是否有效(是否登陆)
通过访问用户订单列表页进行判断:若未登录,将会重定向到登陆页面。
:return: cookies是否有效 True/False
"""
url = 'https://order.jd.com/center/list.action'
payload = {
'rid': str(int(time.time() * 1000)),
}
try:
resp = self.session.get(url=url, params=payload, allow_redirects=False)
if resp.status_code == requests.codes.OK:
return True
except Exception as e:
logger.error("验证cookies是否有效发生异常", e)
return False
def _get_login_page(self):
"""
获取PC端登录页面
阻塞,更新cookies
:return:
"""
url = "https://passport.jd.com/new/login.aspx"
page = self.session.get(url, headers=self.spider_session.get_headers())
return page
def _get_qrcode(self):
"""
缓存并展示登录二维码
:return:
"""
url = 'https://qr.m.jd.com/show'
payload = {
'appid': 133,
'size': 147,
't': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.info('获取二维码失败')
return False
save_image(resp, self.qrcode_img_file)
logger.info('二维码获取成功,请打开京东APP扫描')
open_image(self.qrcode_img_file)
return True
def _get_qrcode_ticket(self):
"""
通过 token 获取票据 ticket
:return:
"""
url = 'https://qr.m.jd.com/check'
payload = {
'appid': '133',
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'token': self.session.cookies.get('wlfstk_smdl'), # 从cookies获取值
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/new/login.aspx',
}
resp = self.session.get(url=url, headers=headers, params=payload)
if not response_status(resp):
logger.error('获取二维码扫描结果异常')
return False
resp_json = parse_json(resp.text)
if resp_json['code'] != 200:
logger.info('Code: %s, Message: %s', resp_json['code'], resp_json['msg'])
return None
else:
logger.info('已完成手机客户端确认')
return resp_json['ticket']
def _validate_qrcode_ticket(self, ticket):
"""
通过已获取的票据进行校验
:param ticket: 已获取的票据
:return:
"""
url = 'https://passport.jd.com/uc/qrCodeTicketValidation'
headers = {
'User-Agent': self.spider_session.get_user_agent(),
'Referer': 'https://passport.jd.com/uc/login?ltype=logout',
}
resp = self.session.get(url=url, headers=headers, params={'t': ticket})
if not response_status(resp):
return False
resp_json = json.loads(resp.text)
if resp_json['returnCode'] == 0:
return True
else:
logger.info(resp_json)
return False
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
self._get_login_page() # 更新cookies
# download QR code
if not self._get_qrcode():
raise SKException('二维码下载失败')
# get QR code ticket
ticket = None
retry_times = 85
for _ in range(retry_times):
# 重试 拿到ticket
ticket = self._get_qrcode_ticket()
if ticket:
break
time.sleep(2)
else:
raise SKException('二维码过期,请重新获取扫描')
# validate QR code ticket
if not self._validate_qrcode_ticket(ticket):
raise SKException('二维码信息校验失败')
self.refresh_login_status()
logger.info('二维码登录成功')
class ProdectPurchase(object):
def __init__(self):
self.spider_session = SpiderSession()
self.spider_session.load_cookies_from_local()
# 共享一个session
self.qrlogin = QrLogin(self.spider_session)
# 初始化信息
self.sku_id = global_config.getRaw('config', 'sku_id')
self.seckill_num = global_config.getRaw('config', 'seckill_num')
self.work_count = global_config.getRaw('config','process_num')
self.seckill_init_info = dict()
self.seckill_url = dict()
self.seckill_order_data = dict()
self.timers = Timer()
self.session = self.spider_session.get_session()
self.user_agent = self.spider_session.user_agent
self.nick_name = None
def login_by_qrcode(self):
"""
二维码登陆
:return:
"""
if self.qrlogin.is_login:
logger.info('登录成功')
return
self.qrlogin.login_by_qrcode()
if self.qrlogin.is_login:
self.nick_name = self.get_username()
self.spider_session.save_cookies_to_local(self.nick_name)
else:
raise SKException("二维码登录失败!")
def check_login(func):
"""
用户登陆态校验装饰器。若用户未登陆,则调用扫码登陆
"""
@functools.wraps(func)
def new_func(self, *args, **kwargs):
if not self.qrlogin.is_login:
logger.info("{0} 需登陆后调用,开始扫码登陆".format(func.__name__))
self.login_by_qrcode()
return func(self, *args, **kwargs)
return new_func
@check_login
def reserve(self):
"""
预约
"""
self._reserve()
@check_login
def seckill(self):
"""
抢购
"""
self._seckill()
@check_login
def seckill_by_proc_pool(self):
"""
多进程进行抢购
work_count:进程数量
"""
with ProcessPoolExecutor() as pool:
for i in range(self.work_count):
pool.submit(self.seckill)
def _reserve(self):
"""
预约
"""
while True:
try:
self.make_reserve()
break
except Exception as e:
logger.info('预约发生异常!', e)
wait_some_time()
def _seckill(self):
"""
抢购
"""
while True:
try:
self.request_seckill_url()
while True:
self.request_seckill_checkout_page()
self.submit_seckill_order()
except Exception as e:
logger.info('抢购发生异常,稍后继续执行!', e)
wait_some_time()
def make_reserve(self):
"""商品预约"""
logger.info('商品名称:{}'.format(self.get_sku_title()))
url = 'https://yushou.jd.com/youshouinfo.action?'
payload = {
'callback': 'fetchJSON',
'sku': self.sku_id,
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
resp = self.session.get(url=url, params=payload, headers=headers)
resp_json = parse_json(resp.text)
reserve_url = resp_json.get('url')
# self.timers.start()
while True:
try:
self.session.get(url='https:' + reserve_url)
logger.info('预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约')
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "预约成功,已获得抢购资格 / 您已成功预约过了,无需重复预约"
send_wechat(success_message)
break
except Exception as e:
logger.error('预约失败正在重试...')
def get_username(self):
"""获取用户信息"""
url = 'https://passport.jd.com/user/petName/getUserInfoForMiniJd.action'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Referer': 'https://order.jd.com/center/list.action',
}
resp = self.session.get(url=url, params=payload, headers=headers)
try_count = 5
while not resp.text.startswith("jQuery"):
try_count = try_count - 1
if try_count > 0:
resp = self.session.get(url=url, params=payload, headers=headers)
else:
break
wait_some_time()
# 响应中包含了许多用户信息,现在在其中返回昵称
# jQuery2381773({"imgUrl":"//storage.360buyimg.com/i.imageUpload/xxx.jpg","lastLoginTime":"","nickName":"xxx","plusStatus":"0","realName":"xxx","userLevel":x,"userScoreVO":{"accountScore":xx,"activityScore":xx,"consumptionScore":xxxxx,"default":false,"financeScore":xxx,"pin":"xxx","riskScore":x,"totalScore":xxxxx}})
return parse_json(resp.text).get('nickName')
def get_sku_title(self):
"""获取商品名称"""
url = 'https://item.jd.com/{}.html'.format(global_config.getRaw('config', 'sku_id'))
resp = self.session.get(url).content
x_data = etree.HTML(resp)
sku_title = x_data.xpath('/html/head/title/text()')
return sku_title[0]
def get_seckill_url(self):
"""获取商品的抢购链接
点击"抢购"按钮后,会有两次302跳转,最后到达订单结算页面
这里返回第一次跳转后的页面url,作为商品的抢购链接
:return: 商品的抢购链接
"""
url = 'https://itemko.jd.com/itemShowBtn'
payload = {
'callback': 'jQuery{}'.format(random.randint(1000000, 9999999)),
'skuId': self.sku_id,
'from': 'pc',
'_': str(int(time.time() * 1000)),
}
headers = {
'User-Agent': self.user_agent,
'Host': 'itemko.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
while True:
resp = self.session.get(url=url, headers=headers, params=payload)
resp_json = parse_json(resp.text)
if resp_json.get('url'):
# https://divide.jd.com/user_rou ... %3Dpc
router_url = 'https:' + resp_json.get('url')
# https://marathon.jd.com/captch ... %3Dpc
seckill_url = router_url.replace(
'divide', 'marathon').replace(
'user_routing', 'captcha.html')
logger.info("抢购链接获取成功: %s", seckill_url)
return seckill_url
else:
logger.info("抢购链接获取失败,稍后自动重试")
wait_some_time()
def request_seckill_url(self):
"""访问商品的抢购链接(用于设置cookie等"""
logger.info('用户:{}'.format(self.get_username()))
logger.info('商品名称:{}'.format(self.get_sku_title()))
self.timers.start() # 阻塞
self.seckill_url[self.sku_id] = self.get_seckill_url()
logger.info('访问商品的抢购连接...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(
url=self.seckill_url.get(
self.sku_id),
headers=headers,
allow_redirects=False)
def request_seckill_checkout_page(self):
"""访问抢购订单结算页面"""
logger.info('访问抢购订单结算页面...')
url = 'https://marathon.jd.com/seckill/seckill.action'
payload = {
'skuId': self.sku_id,
'num': self.seckill_num,
'rid': int(time.time())
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://item.jd.com/{}.html'.format(self.sku_id),
}
self.session.get(url=url, params=payload, headers=headers, allow_redirects=False)
def _get_seckill_init_info(self):
"""获取秒杀初始化信息(包括:地址,发票,token)
:return: 初始化信息组成的dict
"""
logger.info('获取秒杀初始化信息...')
url = 'https://marathon.jd.com/seckillnew/orderService/pc/init.action'
data = {
'sku': self.sku_id,
'num': self.seckill_num,
'isModifyAddress': 'false',
}
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
}
resp = self.session.post(url=url, data=data, headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception:
raise SKException('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return resp_json
def _get_seckill_order_data(self):
"""生成提交抢购订单所需的请求体参数
:return: 请求体参数组成的dict
"""
logger.info('生成提交抢购订单所需参数...')
# 获取用户秒杀初始化信息
self.seckill_init_info[self.sku_id] = self._get_seckill_init_info()
init_info = self.seckill_init_info.get(self.sku_id)
default_address = init_info['addressList'][0] # 默认地址dict
invoice_info = init_info.get('invoiceInfo', {}) # 默认发票信息dict, 有可能不返回
token = init_info['token']
data = {
'skuId': self.sku_id,
'num': self.seckill_num,
'addressId': default_address['id'],
'yuShou': 'true',
'isModifyAddress': 'false',
'name': default_address['name'],
'provinceId': default_address['provinceId'],
'cityId': default_address['cityId'],
'countyId': default_address['countyId'],
'townId': default_address['townId'],
'addressDetail': default_address['addressDetail'],
'mobile': default_address['mobile'],
'mobileKey': default_address['mobileKey'],
'email': default_address.get('email', ''),
'postCode': '',
'invoiceTitle': invoice_info.get('invoiceTitle', -1),
'invoiceCompanyName': '',
'invoiceContent': invoice_info.get('invoiceContentType', 1),
'invoiceTaxpayerNO': '',
'invoiceEmail': '',
'invoicePhone': invoice_info.get('invoicePhone', ''),
'invoicePhoneKey': invoice_info.get('invoicePhoneKey', ''),
'invoice': 'true' if invoice_info else 'false',
'password': global_config.get('account', 'payment_pwd'),
'codTimeType': 3,
'paymentType': 4,
'areaCode': '',
'overseas': 0,
'phone': '',
'eid': global_config.getRaw('config', 'eid'),
'fp': global_config.getRaw('config', 'fp'),
'token': token,
'pru': ''
}
return data
def submit_seckill_order(self):
"""提交抢购(秒杀)订单
:return: 抢购结果 True/False
"""
url = 'https://marathon.jd.com/seckillnew/orderService/pc/submitOrder.action'
payload = {
'skuId': self.sku_id,
}
try:
self.seckill_order_data[self.sku_id] = self._get_seckill_order_data()
except Exception as e:
logger.info('抢购失败,无法获取生成订单的基本信息,接口返回:【{}】'.format(str(e)))
return False
logger.info('提交抢购订单...')
headers = {
'User-Agent': self.user_agent,
'Host': 'marathon.jd.com',
'Referer': 'https://marathon.jd.com/seckill/seckill.action?skuId={0}&num={1}&rid={2}'.format(
self.sku_id, self.seckill_num, int(time.time())),
}
resp = self.session.post(
url=url,
params=payload,
data=self.seckill_order_data.get(
self.sku_id),
headers=headers)
resp_json = None
try:
resp_json = parse_json(resp.text)
except Exception as e:
logger.info('抢购失败,返回信息:{}'.format(resp.text[0: 128]))
return False
# 返回信息
# 抢购失败:
# {'errorMessage': '很遗憾没有抢到,再接再厉哦。', 'orderId': 0, 'resultCode': 60074, 'skuId': 0, 'success': False}
# {'errorMessage': '抱歉,您提交过快,请稍后再提交订单!', 'orderId': 0, 'resultCode': 60017, 'skuId': 0, 'success': False}
# {'errorMessage': '系统正在开小差,请重试~~', 'orderId': 0, 'resultCode': 90013, 'skuId': 0, 'success': False}
# 抢购成功:
# {"appUrl":"xxxxx","orderId":820227xxxxx,"pcUrl":"xxxxx","resultCode":0,"skuId":0,"success":true,"totalMoney":"xxxxx"}
if resp_json.get('success'):
order_id = resp_json.get('orderId')
total_money = resp_json.get('totalMoney')
pay_url = 'https:' + resp_json.get('pcUrl')
logger.info('抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}'.format(order_id, total_money, pay_url))
if global_config.getRaw('messenger', 'enable') == 'true':
success_message = "抢购成功,订单号:{}, 总价:{}, 电脑端付款链接:{}".format(order_id, total_money, pay_url)
send_wechat(success_message)
return True
else:
logger.info('抢购失败,返回信息:{}'.format(resp_json))
if global_config.getRaw('messenger', 'enable') == 'true':
error_message = '抢购失败,返回信息:{}'.format(resp_json)
send_wechat(error_message)
return False
苏宁脚本目前在测试途中,需要继续调试。
原创文章,
转载请注明:http://30daydo.com/article/44129
欢迎关注公众号:
可转债量化分析

python函数调用后面可以有一个空格
李魔佛 发表了文章 • 0 个评论 • 298 次浏览 • 2020-12-13 11:13
print ('hello')
hello
def sayhi():
...: print('Done')
...:
sayhi () # 这里有一个空格
Done
不过如果平时这么写,会被人打的 查看全部
print ('hello')
hello
def sayhi():
...: print('Done')
...:
sayhi () # 这里有一个空格
Done
不过如果平时这么写,会被人打的
导出python自带关键字 keyword
李魔佛 发表了文章 • 0 个评论 • 233 次浏览 • 2020-12-13 10:57
import keyword
keyword.kwlist
Out[3]:
['False',
'None',
'True',
'and',
'as',
'assert',
'async',
'await',
'break',
'class',
'continue',
'def',
'del',
'elif',
'else',
'except',
'finally',
'for',
'from',
'global',
'if',
'import',
'in',
'is',
'lambda',
'nonlocal',
'not',
'or',
'pass',
'raise',
'return',
'try',
'while',
'with',
'yield']
len(keyword.kwlist)
Out[4]: 35 查看全部
import keyword
keyword.kwlist
Out[3]:
['False',
'None',
'True',
'and',
'as',
'assert',
'async',
'await',
'break',
'class',
'continue',
'def',
'del',
'elif',
'else',
'except',
'finally',
'for',
'from',
'global',
'if',
'import',
'in',
'is',
'lambda',
'nonlocal',
'not',
'or',
'pass',
'raise',
'return',
'try',
'while',
'with',
'yield']
len(keyword.kwlist)
Out[4]: 35
微信公众号后台的签名校验的官方教程在python3下不兼容
李魔佛 发表了文章 • 0 个评论 • 230 次浏览 • 2020-12-11 11:43
首先文档用的python2代码写的,但文中没有标明。
python2旧就算了,而且那么多框架不用,还要用一个老掉牙的web.py来写,也是醉了。
django下的签名校验:token = '123456789'
def Services(request):
print(request.method)
if request.method=='GET':
signature = request.GET.get('signature')
echostr = request.GET.get('echostr')
timestamp = request.GET.get('timestamp')
nonce = request.GET.get('nonce')
list_ = [token, timestamp, nonce]
list_.sort()
list_str = ''.join(list_)
sha1 = hashlib.sha1(list_str.encode('utf8'))
hashcode = sha1.hexdigest()
if hashcode==signature:
return HttpResponse(echostr)
else:
return HttpResponse('')
原创文章,转载请注明出处http://30daydo.com/article/44121
查看全部
首先文档用的python2代码写的,但文中没有标明。
python2旧就算了,而且那么多框架不用,还要用一个老掉牙的web.py来写,也是醉了。
django下的签名校验:
token = '123456789'
def Services(request):
print(request.method)
if request.method=='GET':
signature = request.GET.get('signature')
echostr = request.GET.get('echostr')
timestamp = request.GET.get('timestamp')
nonce = request.GET.get('nonce')
list_ = [token, timestamp, nonce]
list_.sort()
list_str = ''.join(list_)
sha1 = hashlib.sha1(list_str.encode('utf8'))
hashcode = sha1.hexdigest()
if hashcode==signature:
return HttpResponse(echostr)
else:
return HttpResponse('')
原创文章,转载请注明出处http://30daydo.com/article/44121
pycharm 条件断点
李魔佛 发表了文章 • 0 个评论 • 256 次浏览 • 2020-12-01 13:42
先在想要停下来的地方打一个断点,然后再点击一下断点,弹出的一个条件断点的窗口,在窗口输入一个条件即可。
比如:
def main():
for i in range(100):
print(i*i)
if __name__ == '__main__':
main()
在i*i的地方打一个断点,然后在条件断点那里输入 i=50, 那么在调试模式下只有在i=50的时候才会停下来。
查看全部
先在想要停下来的地方打一个断点,然后再点击一下断点,弹出的一个条件断点的窗口,在窗口输入一个条件即可。
比如:
def main():
for i in range(100):
print(i*i)
if __name__ == '__main__':
main()
在i*i的地方打一个断点,然后在条件断点那里输入 i=50, 那么在调试模式下只有在i=50的时候才会停下来。
python执行js语句,无函数返回值
李魔佛 发表了文章 • 0 个评论 • 248 次浏览 • 2020-11-30 16:11
如下面的一段JSvar radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
做了很多的运算,掩人耳目,虽然看多几下,用python写也简单,或者把它放入一个function里面也可以。
比如实时网络上下载获取一段js,然后再在头部和尾部组装为function。
function getValue(){
xxxxxx
xxxxxx
return flashvars_324075351
}
然后执行这一段JS,call返回函数 getValue() 就可以拿到返回值了。
不过今天我们用其他的方法直接获取flashvars_324075351
使用jsp库即可。
把上面的JS语句var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
后面加一个返回值,但不需要加return
比如....
ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
flashvars_324075351;
然后直接调用jspy
res = js2py.eval_js(js)
执行后print(res) , 显示的值就是flashvars_324075351
原创文章,转载请注明出处
http://30daydo.com/article/44112
查看全部
如下面的一段JS
var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
做了很多的运算,掩人耳目,虽然看多几下,用python写也简单,或者把它放入一个function里面也可以。
比如实时网络上下载获取一段js,然后再在头部和尾部组装为function。
function getValue(){
xxxxxx
xxxxxx
return flashvars_324075351
}
然后执行这一段JS,call返回函数 getValue() 就可以拿到返回值了。
不过今天我们用其他的方法直接获取flashvars_324075351
使用jsp库即可。
把上面的JS语句
var radra27radra27 = "D";
var ra72419ra91ra72419ra91 = "7.241.9" + ".";
var raurst500ra63raurst500ra63 = "urst=500";
var ravalidtora49ravalidtora49 = "validto=";
var raevphncdra57raevphncdra57 = "ev.ph" + "ncd";
var ra16067161ra17ra16067161ra17 = "16067161";
var radeos202ra16radeos202ra16 = "deos/20" + "2";
var ra1080p4ra73ra1080p4ra73 = "/1080P_4";
var ra209hashra72ra209hashra72 = "209&hash";
var ra2bmkdz7nra36ra2bmkdz7nra36 = "2BMKd" + "z7N";
var ra6708909ra29ra6708909ra29 = "6708909" + "&";
var ra00kip4ra41ra00kip4ra41 = "00k&i" + "p=4";
var ra006163ra73ra006163ra73 = "006/16/" + "3";
var raro7upu3ra66raro7upu3ra66 = "Ro7UPU%3";
var raroiu6qra26raroiu6qra26 = "=rOiU6q%";
var ra075351mra26ra075351mra26 = "075351." + "m";
var ramgdmctbvra11ramgdmctbvra11 = "MgdmCtbV";
var rap4validra25rap4validra25 = "p4?valid";
var ra09ratera79ra09ratera79 = "09&rate=";
var rancomvira35rancomvira35 = "n.com/vi";
var ra24075351ra94ra24075351ra94 = "24075" + "351";
var ra000k324ra70ra000k324ra70 = "000K_324";
var ra50000kbra49ra50000kbra49 = "50000k&" + "b";
var rahttpsra83rahttpsra83 = "https://";
var rafrom160ra56rafrom160ra56 = "from=16" + "0";
var quality_1080p =/* + radra27radra27 + */rahttpsra83rahttpsra83 + /* + rancomvira35rancomvira35 + */raevphncdra57raevphncdra57 + /* + radra27radra27 + */rancomvira35rancomvira35 + /* + ra006163ra73ra006163ra73 + */radeos202ra16radeos202ra16 + /* + ra09ratera79ra09ratera79 + */ra006163ra73ra006163ra73 + /* + ra1080p4ra73ra1080p4ra73 + */ra24075351ra94ra24075351ra94 + /* + raroiu6qra26raroiu6qra26 + */ra1080p4ra73ra1080p4ra73 + /* + ra000k324ra70ra000k324ra70 + */ra000k324ra70ra000k324ra70 + /* + rancomvira35rancomvira35 + */ra075351mra26ra075351mra26 + /* + ravalidtora49ravalidtora49 + */rap4validra25rap4validra25 + /* + ra209hashra72ra209hashra72 + */rafrom160ra56rafrom160ra56 + /* + ra1080p4ra73ra1080p4ra73 + */ra6708909ra29ra6708909ra29 + /* + ra209hashra72ra209hashra72 + */ravalidtora49ravalidtora49 + /* + ramgdmctbvra11ramgdmctbvra11 + */ra16067161ra17ra16067161ra17 + /* + ra24075351ra94ra24075351ra94 + */ra09ratera79ra09ratera79 + /* + ra50000kbra49ra50000kbra49 + */ra50000kbra49ra50000kbra49 + /* + ramgdmctbvra11ramgdmctbvra11 + */raurst500ra63raurst500ra63 + /* + ra209hashra72ra209hashra72 + */ra00kip4ra41ra00kip4ra41 + /* + raroiu6qra26raroiu6qra26 + */ra72419ra91ra72419ra91 + /* + ra09ratera79ra09ratera79 + */ra209hashra72ra209hashra72 + /* + raro7upu3ra66raro7upu3ra66 + */raroiu6qra26raroiu6qra26 + /* + ra075351mra26ra075351mra26 + */ra2bmkdz7nra36ra2bmkdz7nra36 + /* + ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
后面加一个返回值,但不需要加return
比如
....
ra50000kbra49ra50000kbra49 + */ramgdmctbvra11ramgdmctbvra11 + /* + radeos202ra16radeos202ra16 + */raro7upu3ra66raro7upu3ra66 + /* + ra075351mra26ra075351mra26 + */radra27radra27;
flashvars_324075351["quality_1080p"] = quality_1080p;
flashvars_324075351;
然后直接调用jspy
res = js2py.eval_js(js)
执行后print(res) , 显示的值就是flashvars_324075351
原创文章,转载请注明出处
http://30daydo.com/article/44112
python多重继承的super调用父类的兄弟类
李魔佛 发表了文章 • 0 个评论 • 259 次浏览 • 2020-11-26 19:04
def __init__(self):
print('A init')
print(self)
class B:
def __init__(self):
print('B init')
print(self)
class C:
def __init__(self):
print('C init')
print(self)
class D(C, B, A):
def __init__(self):
super(A, self).__init__()
super(C, self).__init__()
super(B, self).__init__()
print('D ')
def main():
d = D()
看输出的是什么:B init
<__main__.D object at 0x00000000026365B0>
A init
<__main__.D object at 0x00000000026365B0>
D init-
为什么不输出C init ?
那么这个药从super的函数实现说起:def super(class, obj):
mro_list = obj.__class__.mro()
next_parent_class = mro_list[mro_list.index(class)+1]
return next_parent_class
super函数中,mro_list得到的是类的mro列表,mro列表就是类的继承有序关系图
比如上面代码中
C, B, A 在D中的mro是下面这样的。[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
你提供的类名class传入来后,比如是C,那么找到C所在的index,然后加1后的class,也就是B,所以super(C,self).__init__()
实际调用的是B.__init__()
另外,如果在多重继承中,要调用父类的父类的父类。。。。,
可以直接用改类的类名就可以# 多重继承
class A:
def __init__(self):
print('A init')
print(self)
def fun(self):
print('A',self)
class B(A):
def __init__(self):
print('B init')
print(self)
def fun(self):
print('B', self)
class C(B):
def __init__(self):
print('C init')
print(self)
def fun(self):
print('C', self)
class X:
def fun(self):
print('X',self)
class D(C):
def __init__(self):
# super(B, self).__init__() # super(D) -> 指向了C
print('D init')
def fun(self):
C.fun(self)
B.fun(self)
A.fun(self)
X.fun(self)
def main():
d = D()
print(d.__class__.mro())
d.fun()
if __name__ == '__main__':
main()
结果:D init
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
C <__main__.D object at 0x00000000025F6700>
B <__main__.D object at 0x00000000025F6700>
A <__main__.D object at 0x00000000025F6700>
X <__main__.D object at 0x00000000025F6700>
其实super和父类子类没什么关系, super关系的是mro里面的顺序。
原文链接:http://30daydo.com/article/44107
转载请注明出处
查看全部
class A:
def __init__(self):
print('A init')
print(self)
class B:
def __init__(self):
print('B init')
print(self)
class C:
def __init__(self):
print('C init')
print(self)
class D(C, B, A):
def __init__(self):
super(A, self).__init__()
super(C, self).__init__()
super(B, self).__init__()
print('D ')
def main():
d = D()
看输出的是什么:
B init
<__main__.D object at 0x00000000026365B0>
A init
<__main__.D object at 0x00000000026365B0>
D init-
为什么不输出C init ?
那么这个药从super的函数实现说起:
def super(class, obj):
mro_list = obj.__class__.mro()
next_parent_class = mro_list[mro_list.index(class)+1]
return next_parent_class
super函数中,mro_list得到的是类的mro列表,mro列表就是类的继承有序关系图
比如上面代码中
C, B, A 在D中的mro是下面这样的。
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
你提供的类名class传入来后,比如是C,那么找到C所在的index,然后加1后的class,也就是B,所以super(C,self).__init__()
实际调用的是B.__init__()
另外,如果在多重继承中,要调用父类的父类的父类。。。。,
可以直接用改类的类名就可以
# 多重继承
class A:
def __init__(self):
print('A init')
print(self)
def fun(self):
print('A',self)
class B(A):
def __init__(self):
print('B init')
print(self)
def fun(self):
print('B', self)
class C(B):
def __init__(self):
print('C init')
print(self)
def fun(self):
print('C', self)
class X:
def fun(self):
print('X',self)
class D(C):
def __init__(self):
# super(B, self).__init__() # super(D) -> 指向了C
print('D init')
def fun(self):
C.fun(self)
B.fun(self)
A.fun(self)
X.fun(self)
def main():
d = D()
print(d.__class__.mro())
d.fun()
if __name__ == '__main__':
main()
结果:
D init
[<class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class '__main__.A'>, <class 'object'>]
C <__main__.D object at 0x00000000025F6700>
B <__main__.D object at 0x00000000025F6700>
A <__main__.D object at 0x00000000025F6700>
X <__main__.D object at 0x00000000025F6700>
其实super和父类子类没什么关系, super关系的是mro里面的顺序。
原文链接:http://30daydo.com/article/44107
转载请注明出处
python pathspec 库的作用
李魔佛 发表了文章 • 0 个评论 • 257 次浏览 • 2020-11-25 13:28
看以下实例:
def get_ignore_matches():
# 排除文件
global ignore_matches
ignore_file = os.path.join(os.path.abspath(os.curdir), '.gitignore')
if not os.path.exists(ignore_file):
return None
if ignore_matches is not None:
return ignore_matches
with open(ignore_file, 'r') as fh:
spec = pathspec.PathSpec.from_lines('gitwildmatch', fh)
ignore_matches = spec
return ignore_matches
def is_ignored(file_name: str) -> bool:
# 匹配就ignore
matches = get_ignore_matches()
if matches is None:
return False
return matches.match_file(file_name)
gitignore文件里面的内容就会被匹配到
.idea/
build/
dist/
venv/
*.pyc
__pycache__/
*.egg-info/
tmp/ 查看全部
看以下实例:
def get_ignore_matches():
# 排除文件
global ignore_matches
ignore_file = os.path.join(os.path.abspath(os.curdir), '.gitignore')
if not os.path.exists(ignore_file):
return None
if ignore_matches is not None:
return ignore_matches
with open(ignore_file, 'r') as fh:
spec = pathspec.PathSpec.from_lines('gitwildmatch', fh)
ignore_matches = spec
return ignore_matches
def is_ignored(file_name: str) -> bool:
# 匹配就ignore
matches = get_ignore_matches()
if matches is None:
return False
return matches.match_file(file_name)
gitignore文件里面的内容就会被匹配到
.idea/
build/
dist/
venv/
*.pyc
__pycache__/
*.egg-info/
tmp/
夜深了,你们还在吗?
chenchen 发表了文章 • 0 个评论 • 173 次浏览 • 2020-11-20 22:34
大家好啊,日常报道,关照关照
chenchen 发表了文章 • 0 个评论 • 183 次浏览 • 2020-11-20 17:11
异步asyncio加锁 的正确用法
李魔佛 发表了文章 • 0 个评论 • 367 次浏览 • 2020-11-15 10:19
import aiohttp
import asyncio
import execjs
import threading
global pages
global count
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}
home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'
loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result
async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()
try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)
async with lock:
count=count+1
print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()
async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:
content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))
await asyncio.sleep(1)
asyncio.ensure_future(main())
loop.run_forever()
1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
查看全部
import aiohttp
import asyncio
import execjs
import threading
global pages
global count
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Host": "dcfm.eastmoney.com",
"Pragma": "no-cache",
"Referer": "http://data.eastmoney.com/xg/xg/default.html",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/69.0.3497.81 Chrome/69.0.3497.81 Safari/537.36",
}
home_url = 'http://dcfm.eastmoney.com/em_mutisvcexpandinterface/api/js/get?type=XGSG_LB&token=70f12f2f4f091e459a279469fe49eca5&st=purchasedate,securitycode&sr=-1&p={}&ps=50&js=var%20hsEnHLwG={{pages:(tp),data:(x)}}&rt=53512217'
loop = asyncio.get_event_loop()
# lock = threading.Lock()
lock = asyncio.Lock()
def parse_json(content):
content += ';function getV(){return hsEnHLwG;}'
ctx = execjs.compile(content)
result = ctx.call('getV')
return result
async def fetch(session,page):
global pages
global count
async with session.get(home_url.format(page),headers=headers) as resp:
# print(f'here:: {page}')
content = await resp.text()
try:
js_content = parse_json(content)
for stock_info in js_content['data']:
securityshortname = stock_info['securityshortname']
# print(securityshortname)
except Exception as e:
print(e)
async with lock:
count=count+1
print(f'count:{count}')
if count == pages:
print('End of loop')
loop.stop()
async def main():
global pages
global count
count=0
async with aiohttp.ClientSession() as session:
async with session.get(home_url.format(1), headers=headers) as resp:
content = await resp.text()
js_data = parse_json(content)
pages = js_data['pages']
print(f'pages: {pages}')
for page in range(1,pages+1):
task = asyncio.ensure_future(fetch(session,page))
await asyncio.sleep(1)
asyncio.ensure_future(main())
loop.run_forever()
1. 如果不加入锁,每次运行的结果可能不一样。
2. 不能用多线程的threading 锁,得到的每次运行结果也有可能不一样
3. 用asyncio的锁要 加关键字 async
attrs() got an unexpected keyword argument 'eq'
李魔佛 发表了文章 • 0 个评论 • 376 次浏览 • 2020-11-12 22:42
Collecting attrs
Downloading https://files.pythonhosted.org ... y.whl (49kB)
100% |████████████████████████████████| 51kB 79kB/s
Installing collected packages: attrs
Found existing installation: attrs 18.2.0
Uninstalling attrs-18.2.0:
Successfully uninstalled attrs-18.2.0
Successfully installed attrs-20.3.0 查看全部
Collecting attrs
Downloading https://files.pythonhosted.org ... y.whl (49kB)
100% |████████████████████████████████| 51kB 79kB/s
Installing collected packages: attrs
Found existing installation: attrs 18.2.0
Uninstalling attrs-18.2.0:
Successfully uninstalled attrs-18.2.0
Successfully installed attrs-20.3.0
pyecharts绘图保存为图片 适用于ssh无头浏览器运行
李魔佛 发表了文章 • 0 个评论 • 329 次浏览 • 2020-11-04 22:27
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
在最后一行传入一个driver既可以了,这个driver使用phantomjs的实例。
import os
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Bar
import sys
from selenium import webdriver
from pyecharts.commons.utils import JsCode
if sys.platform == 'win32':
SELENIUM_PATH = r'C:\OneDrive\Tool\phantomjs-2.1.1-windows\phantomjs-2.1.1-windows\bin\phantomjs.exe'
driver = None
else:
SELENIUM_PATH = './phantomjs'
driver = webdriver.PhantomJS(executable_path=SELENIUM_PATH)
bar = (
Bar()
.add_xaxis(list(result_dict .keys()))
.add_yaxis(f"{today}-可转债价格分布", y_list, category_gap=3)
.add_yaxis(f"{today}-正股价格分布", y_zg_list, category_gap=3)
.set_series_opts(
label_opts=opts.LabelOpts(is_show=True),
axispointer_opts=opts.AxisPointerOpts(is_show=True))
.set_global_opts(
title_opts=opts.TitleOpts(title="可转债价格分布"),
xaxis_opts=opts.AxisOpts(
name="涨跌幅",
is_show=True,
name_rotate=30,
),
graphic_opts=[
opts.GraphicGroup(
graphic_item=opts.GraphicItem(
left="70%",
top="20%",
),
children=[
opts.GraphicText(
graphic_item=opts.GraphicItem(
left="center",
top="middle",
z=100,
),
graphic_textstyle_opts=opts.GraphicTextStyleOpts(
text=JsCode(
f"['涨幅>=0:{bigger}',"
f"'涨幅<0:{smaller}',"
f"'平均涨幅:{avg}%',"
f"'波动方差:{std}',"
f"'',"
f"'最大:{max_name} {max_pct}%',"
f"'最小:{min_name} {min_pct}%',"
"''].join('\\n')"
),
font="14px Microsoft YaHei",
graphic_basicstyle_opts=opts.GraphicBasicStyleOpts(
fill="#333"
)
)
)
]
)
],
)
)
bar.render(os.path.join('data', f"{today}_cb.html"))
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
查看全部
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
在最后一行传入一个driver既可以了,这个driver使用phantomjs的实例。
import os
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Bar
import sys
from selenium import webdriver
from pyecharts.commons.utils import JsCode
if sys.platform == 'win32':
SELENIUM_PATH = r'C:\OneDrive\Tool\phantomjs-2.1.1-windows\phantomjs-2.1.1-windows\bin\phantomjs.exe'
driver = None
else:
SELENIUM_PATH = './phantomjs'
driver = webdriver.PhantomJS(executable_path=SELENIUM_PATH)
bar = (
Bar()
.add_xaxis(list(result_dict .keys()))
.add_yaxis(f"{today}-可转债价格分布", y_list, category_gap=3)
.add_yaxis(f"{today}-正股价格分布", y_zg_list, category_gap=3)
.set_series_opts(
label_opts=opts.LabelOpts(is_show=True),
axispointer_opts=opts.AxisPointerOpts(is_show=True))
.set_global_opts(
title_opts=opts.TitleOpts(title="可转债价格分布"),
xaxis_opts=opts.AxisOpts(
name="涨跌幅",
is_show=True,
name_rotate=30,
),
graphic_opts=[
opts.GraphicGroup(
graphic_item=opts.GraphicItem(
left="70%",
top="20%",
),
children=[
opts.GraphicText(
graphic_item=opts.GraphicItem(
left="center",
top="middle",
z=100,
),
graphic_textstyle_opts=opts.GraphicTextStyleOpts(
text=JsCode(
f"['涨幅>=0:{bigger}',"
f"'涨幅<0:{smaller}',"
f"'平均涨幅:{avg}%',"
f"'波动方差:{std}',"
f"'',"
f"'最大:{max_name} {max_pct}%',"
f"'最小:{min_name} {min_pct}%',"
"''].join('\\n')"
),
font="14px Microsoft YaHei",
graphic_basicstyle_opts=opts.GraphicBasicStyleOpts(
fill="#333"
)
)
)
]
)
],
)
)
bar.render(os.path.join('data', f"{today}_cb.html"))
make_snapshot(snapshot, bar.render(), f"data/{today}_cb.png", driver=driver)
使用sshtunnel SSHTunnelForwarder 作为跳板连接mysql后一直卡住不退出
李魔佛 发表了文章 • 0 个评论 • 524 次浏览 • 2020-11-04 10:10
ssh_address_or_host=host,
ssh_port=port,
ssh_username=user,
ssh_password=password,
local_bind_address=('127.0.0.1', local_port),
remote_bind_address=(host, mysql_port)
)
server.start()
conn = pymysql.connect(
host='127.0.0.1',
port=local_port,
user=user,
password=password,
db='db_stock'
)
cursor = conn.cursor()
cursor.execute('select count(*) from tb_cb_index')
ret = cursor.fetchall()
print(ret)
server.stop()
print('stop')
代码运行后并没有结束,或者没有答应stop的字符。 在程序里已经使用了server.stop()关闭ssh的连接。
后面发现日志里面,mysql的连接没有断开,导致server没有被关闭,所以在上面的代码中加一句:
print(ret)
conn.close()
server.stop()
print('stop')
把mysql的连接关闭,然后就可以把ssh的连接关闭,然后打印stop字符了。
查看全部
server = SSHTunnelForwarder(代码运行后并没有结束,或者没有答应stop的字符。 在程序里已经使用了server.stop()关闭ssh的连接。
ssh_address_or_host=host,
ssh_port=port,
ssh_username=user,
ssh_password=password,
local_bind_address=('127.0.0.1', local_port),
remote_bind_address=(host, mysql_port)
)
server.start()
conn = pymysql.connect(
host='127.0.0.1',
port=local_port,
user=user,
password=password,
db='db_stock'
)
cursor = conn.cursor()
cursor.execute('select count(*) from tb_cb_index')
ret = cursor.fetchall()
print(ret)
server.stop()
print('stop')
后面发现日志里面,mysql的连接没有断开,导致server没有被关闭,所以在上面的代码中加一句:
print(ret)
conn.close()
server.stop()
print('stop')
把mysql的连接关闭,然后就可以把ssh的连接关闭,然后打印stop字符了。