如何使用控制台将动态加载数据刷新出来啊????

低调的哥哥 回复了问题 • 2 人关注 • 1 个回复 • 188 次浏览 • 2021-08-11 02:06 • 来自相关话题

Go的包管理比python烂得多,不知道为啥还要被吹捧

李魔佛 发表了文章 • 0 个评论 • 274 次浏览 • 2021-08-07 23:53 • 来自相关话题

1. 不兼容
GO111MODULE与GOPATH不兼容
 
2. 竟然要搞动系统的环境变量。
主要本人使用vim开发,编译运行都在shell底下,windows也是在cmd下跑的
 
3. 下载包的地址被墙,需要设置国内的地址。并且官方的包镜像也在github,其下载速度,你懂的。
蜗牛一样的。 查看全部
1. 不兼容
GO111MODULE与GOPATH不兼容
 
2. 竟然要搞动系统的环境变量。
主要本人使用vim开发,编译运行都在shell底下,windows也是在cmd下跑的
 
3. 下载包的地址被墙,需要设置国内的地址。并且官方的包镜像也在github,其下载速度,你懂的。
蜗牛一样的。

python 上传文件夹内图片到七牛,同时加入批量删除,单个删除

李魔佛 发表了文章 • 0 个评论 • 231 次浏览 • 2021-08-07 17:08 • 来自相关话题

先注册好七牛的账户,那都AK和SK两个key
 
然后把key写入到环境变量或者写到下面的python文件里面from qiniu import Auth, put_file,BucketManager,build_batch_delete
import os
import fire

access_key = os.getenv('qiniu_access_key')
secret_key = os.getenv('qiniu_secret_key')

bucket_name = '' # 你的空间名称

HOST ='[url]http://xximg.xxx.com/{}'[/url] # 可以不用填

TEMPLATE = '\n![{}]({})\n\n\n'

def upload(file,category=''):

    #构建鉴权对象
    q = Auth(access_key, secret_key)
    #要上传的空间

    #上传后保存的文件名
    key = category +'/' + os.path.split(file)[1]
    #生成上传 Token,可以指定过期时间等
    token = q.upload_token(bucket_name, key) # 永不过期

    #要上传文件的本地路径
    ret, info = put_file(token, key, file, version='v1') 
    print(ret)
    print(info)
    return HOST.format(ret['key'])

def bulk_upload(path,category=''):
    with open('qiniu_image.md','a+') as fp:
        for file in os.listdir(path):
            full_path = os.path.join(path,file)
            if os.path.isfile(full_path):

                host_url = upload(full_path,category)
                fp.write(TEMPLATE.format(host_url,host_url))


def get_file_info(prefix,limit = 10):
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)

    delimiter = None
    marker = None
    ret, eof, info = bucket.list(bucket_name, prefix, marker, limit, delimiter)

    # assert len(ret.get('items')) is not None
    url_list=[]
    for item in ret.get('items',):
        url_list.append(item['key'])
    # print(url_list)
    # print(len(url_list))
    return url_list,len(url_list)

def bulk_delete(prefix,limit=None):
    url_list,lens = get_file_info(prefix,limit=limit)
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)
    ops = build_batch_delete(bucket_name, url_list)
    ret, info = bucket.batch(ops)
    print(info)
    print(ret)

def delete_one(key):
    q = Auth(access_key, secret_key)
    #初始化BucketManager
    bucket = BucketManager(q)
    #你要测试的空间, 并且这个key在你空间中存在
    # key = 'python-logo.png'
    #删除bucket_name 中的文件 key
    ret, info = bucket.delete(bucket_name, key)
    print(info)
    print(ret)
    # assert ret == {}


def bulk_delete_ones(prefix):
    url_list,lens = get_file_info(prefix,limit=10)
    for url in url_list:
        delete_one(url)
        # print(url)

def main(path,category):

    if os.path.isdir(path):

         bulk_upload(path,category)
    elif os.path.isfile(path):
         upload(path,category)
    else:
         raise ValueError('文件不存在')

    get_file_info()
    bulk_delete('resource')
    bulk_delete_ones('resource')
    delete_one('resource/data_beauty.png')

if __name__ == '__main__':
    fire.Fire(main)



然后运行:
python main.py --path='C:\Photo' --category='person'
运行后会上传到七牛的虚拟目录 person目录下
 
如果要删除,bulk_delete批量删除某个前缀或者文件夹的  查看全部
先注册好七牛的账户,那都AK和SK两个key
 
然后把key写入到环境变量或者写到下面的python文件里面
from qiniu import Auth, put_file,BucketManager,build_batch_delete
import os
import fire

access_key = os.getenv('qiniu_access_key')
secret_key = os.getenv('qiniu_secret_key')

bucket_name = '' # 你的空间名称

HOST ='[url]http://xximg.xxx.com/{}'[/url] # 可以不用填

TEMPLATE = '\n![{}]({})\n\n\n'

def upload(file,category=''):

    #构建鉴权对象
    q = Auth(access_key, secret_key)
    #要上传的空间

    #上传后保存的文件名
    key = category +'/' + os.path.split(file)[1]
    #生成上传 Token,可以指定过期时间等
    token = q.upload_token(bucket_name, key) # 永不过期

    #要上传文件的本地路径
    ret, info = put_file(token, key, file, version='v1') 
    print(ret)
    print(info)
    return HOST.format(ret['key'])

def bulk_upload(path,category=''):
    with open('qiniu_image.md','a+') as fp:
        for file in os.listdir(path):
            full_path = os.path.join(path,file)
            if os.path.isfile(full_path):

                host_url = upload(full_path,category)
                fp.write(TEMPLATE.format(host_url,host_url))


def get_file_info(prefix,limit = 10):
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)

    delimiter = None
    marker = None
    ret, eof, info = bucket.list(bucket_name, prefix, marker, limit, delimiter)

    # assert len(ret.get('items')) is not None
    url_list=[]
    for item in ret.get('items',):
        url_list.append(item['key'])
    # print(url_list)
    # print(len(url_list))
    return url_list,len(url_list)

def bulk_delete(prefix,limit=None):
    url_list,lens = get_file_info(prefix,limit=limit)
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)
    ops = build_batch_delete(bucket_name, url_list)
    ret, info = bucket.batch(ops)
    print(info)
    print(ret)

def delete_one(key):
    q = Auth(access_key, secret_key)
    #初始化BucketManager
    bucket = BucketManager(q)
    #你要测试的空间, 并且这个key在你空间中存在
    # key = 'python-logo.png'
    #删除bucket_name 中的文件 key
    ret, info = bucket.delete(bucket_name, key)
    print(info)
    print(ret)
    # assert ret == {}


def bulk_delete_ones(prefix):
    url_list,lens = get_file_info(prefix,limit=10)
    for url in url_list:
        delete_one(url)
        # print(url)

def main(path,category):

    if os.path.isdir(path):

         bulk_upload(path,category)
    elif os.path.isfile(path):
         upload(path,category)
    else:
         raise ValueError('文件不存在')

    get_file_info()
    bulk_delete('resource')
    bulk_delete_ones('resource')
    delete_one('resource/data_beauty.png')

if __name__ == '__main__':
    fire.Fire(main)



然后运行:
python main.py --path='C:\Photo' --category='person'
运行后会上传到七牛的虚拟目录 person目录下
 
如果要删除,bulk_delete批量删除某个前缀或者文件夹的 

python pyecharts 多图叠加 bar和line叠加在一张图上

李魔佛 发表了文章 • 0 个评论 • 293 次浏览 • 2021-07-10 12:21 • 来自相关话题

 
先准备一个bar图
import pyecharts.options as opts
from pyecharts.charts import Bar, Line

x_data = ["1月", "2月", "3月", "4月", "5月", "6月", "7月", "8月", "9月", "10月", "11月", "12月"]

bar = (
Bar(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="蒸发量",
y_axis=[
2.0,
4.9,
7.0,
23.2,
25.6,
76.7,
135.6,
162.2,
32.6,
20.0,
6.4,
3.3,
],
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="降水量",
y_axis=[
2.6,
5.9,
9.0,
26.4,
28.7,
70.7,
175.6,
182.2,
48.7,
18.8,
6.0,
2.3,
],
label_opts=opts.LabelOpts(is_show=False),
)
.extend_axis(
yaxis=opts.AxisOpts(
name="温度",
type_="value",
min_=0,
max_=25,
interval=5,
axislabel_opts=opts.LabelOpts(formatter="{value} °C"),
)
)
.set_global_opts(
tooltip_opts=opts.TooltipOpts(
is_show=True, trigger="axis", axis_pointer_type="cross"
),
xaxis_opts=opts.AxisOpts(
type_="category",
axispointer_opts=opts.AxisPointerOpts(is_show=True, type_="shadow"),
),
yaxis_opts=opts.AxisOpts(
name="水量",
type_="value",
min_=0,
max_=250,
interval=50,
axislabel_opts=opts.LabelOpts(formatter="{value} ml"),
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
)
)
再加一个折线图
line = (
Line()
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="平均温度",
yaxis_index=1,
y_axis=[2.0, 2.2, 3.3, 4.5, 6.3, 10.2, 20.3, 23.4, 23.0, 16.5, 12.0, 6.2],
label_opts=opts.LabelOpts(is_show=False),
)
)
然后使用overlap 函数叠加在一起
 
bar.overlap(line).render_notebook()





  查看全部
 
先准备一个bar图
import pyecharts.options as opts
from pyecharts.charts import Bar, Line

x_data = ["1月", "2月", "3月", "4月", "5月", "6月", "7月", "8月", "9月", "10月", "11月", "12月"]

bar = (
Bar(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="蒸发量",
y_axis=[
2.0,
4.9,
7.0,
23.2,
25.6,
76.7,
135.6,
162.2,
32.6,
20.0,
6.4,
3.3,
],
label_opts=opts.LabelOpts(is_show=False),
)
.add_yaxis(
series_name="降水量",
y_axis=[
2.6,
5.9,
9.0,
26.4,
28.7,
70.7,
175.6,
182.2,
48.7,
18.8,
6.0,
2.3,
],
label_opts=opts.LabelOpts(is_show=False),
)
.extend_axis(
yaxis=opts.AxisOpts(
name="温度",
type_="value",
min_=0,
max_=25,
interval=5,
axislabel_opts=opts.LabelOpts(formatter="{value} °C"),
)
)
.set_global_opts(
tooltip_opts=opts.TooltipOpts(
is_show=True, trigger="axis", axis_pointer_type="cross"
),
xaxis_opts=opts.AxisOpts(
type_="category",
axispointer_opts=opts.AxisPointerOpts(is_show=True, type_="shadow"),
),
yaxis_opts=opts.AxisOpts(
name="水量",
type_="value",
min_=0,
max_=250,
interval=50,
axislabel_opts=opts.LabelOpts(formatter="{value} ml"),
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
),
)
)

再加一个折线图
line = (
Line()
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="平均温度",
yaxis_index=1,
y_axis=[2.0, 2.2, 3.3, 4.5, 6.3, 10.2, 20.3, 23.4, 23.0, 16.5, 12.0, 6.2],
label_opts=opts.LabelOpts(is_show=False),
)
)

然后使用overlap 函数叠加在一起
 
bar.overlap(line).render_notebook()


Selection_008.png

 

python rabbitmq 连接时无法正常发送和接受消息

李魔佛 发表了文章 • 0 个评论 • 354 次浏览 • 2021-06-28 19:44 • 来自相关话题

用的是有密码的连接:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth)) 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。 查看全部
用的是有密码的连接:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))
使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth))
 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。

flask自定义所有错误返回json格式

李魔佛 发表了文章 • 0 个评论 • 257 次浏览 • 2021-06-13 19:05 • 来自相关话题

使用app.register_error_andler绑定时,把debug=True去掉才可以。
使用app.register_error_andler绑定时,把debug=True去掉才可以。

pandas 合并两个表,如何保留第一个表的索引?

李魔佛 发表了文章 • 0 个评论 • 478 次浏览 • 2021-05-29 22:17 • 来自相关话题

df1 数据
tickerBond closePriceBond bondPremRatio secShortNameBond tickerEqu \
secID
110066 110066 199.94 -1.2442 盛屯转债 600711
110067 110067 119.53 25.9204 华安转债 600909
113021 113021 105.81 45.0989 中信转债 601998
113024 113024 101.94 36.6668 核建转债 601611
113025 113025 129.16 0.0409 明泰转债 601677
df2 数据
ROE tickerEqu
0 2.642931 600711
1 4.425438 600909
2 6.259092 601998
3 4.432315 601611
4 6.454054 601677
如果按照 pd.merge(df1,df2,on='tickerEqu') ,按照列 tickerEqu 进行合并,这样会导致最后合成的新的列的索性重构,变成 0,1,2,3 这种的。

有什么办法可以保留 df1 的索引? 用 join 的话会报错,因为 df2 的索引和 df1 匹配不上。
 
先 df1 = df1.reset_index(),合并之后再把 secID 那一列设为 index 。 查看全部
df1 数据
 tickerBond  closePriceBond  bondPremRatio secShortNameBond tickerEqu  \
secID
110066 110066 199.94 -1.2442 盛屯转债 600711
110067 110067 119.53 25.9204 华安转债 600909
113021 113021 105.81 45.0989 中信转债 601998
113024 113024 101.94 36.6668 核建转债 601611
113025 113025 129.16 0.0409 明泰转债 601677

df2 数据
        ROE tickerEqu
0 2.642931 600711
1 4.425438 600909
2 6.259092 601998
3 4.432315 601611
4 6.454054 601677

如果按照 pd.merge(df1,df2,on='tickerEqu') ,按照列 tickerEqu 进行合并,这样会导致最后合成的新的列的索性重构,变成 0,1,2,3 这种的。

有什么办法可以保留 df1 的索引? 用 join 的话会报错,因为 df2 的索引和 df1 匹配不上。
 
先 df1 = df1.reset_index(),合并之后再把 secID 那一列设为 index 。

pip install peewee : AttributeError: 'str' object has no attribute 'decode'

李魔佛 发表了文章 • 0 个评论 • 481 次浏览 • 2021-04-26 23:58 • 来自相关话题

 
ERROR: Command errored out with exit status 1:
command: 'C:\anaconda\python.exe' -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'C:\\Users\\xda\\AppData\
\Local\\Temp\\pip-install-ftotbzih\\peewee\\setup.py'"'"'; __file__='"'"'C:\\Users\\xda\\AppData\\Local\\Temp\\pip-insta
ll-ftotbzih\\peewee\\setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"
'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base 'C:\Users\xda\AppData\Lo
cal\Temp\pip-pip-egg-info-8ou7yi3i'
cwd: C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\
Complete output (15 lines):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\setup.py", line 99, in <module>
elif not _have_sqlite_extension_support():
File "C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\setup.py", line 76, in _have_sqlite_extension_su
pport
compiler.compile([src_file], output_dir=tmp_dir),
File "C:\anaconda\lib\distutils\_msvccompiler.py", line 327, in compile
self.initialize()
File "C:\anaconda\lib\distutils\_msvccompiler.py", line 224, in initialize
vc_env = _get_vc_env(plat_spec)
File "C:\anaconda\lib\site-packages\setuptools\msvc.py", line 314, in msvc14_get_vc_env
return _msvc14_get_vc_env(plat_spec)
File "C:\anaconda\lib\site-packages\setuptools\msvc.py", line 273, in _msvc14_get_vc_env
out = subprocess.check_output(
AttributeError: 'str' object has no attribute 'decode'
----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
同样的编码问题,同样的解决方法:
找到文件msvc.py
大概在276行:
out = subprocess.check_output(
'cmd /u /c "{}" {} && set'.format(vcvarsall, plat_spec),
stderr=subprocess.STDOUT,
)
# ).decode('utf-16le', errors='replace')把decode的部分注释掉即可
  查看全部
 
    ERROR: Command errored out with exit status 1:
command: 'C:\anaconda\python.exe' -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'C:\\Users\\xda\\AppData\
\Local\\Temp\\pip-install-ftotbzih\\peewee\\setup.py'"'"'; __file__='"'"'C:\\Users\\xda\\AppData\\Local\\Temp\\pip-insta
ll-ftotbzih\\peewee\\setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"
'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base 'C:\Users\xda\AppData\Lo
cal\Temp\pip-pip-egg-info-8ou7yi3i'
cwd: C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\
Complete output (15 lines):
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\setup.py", line 99, in <module>
elif not _have_sqlite_extension_support():
File "C:\Users\xda\AppData\Local\Temp\pip-install-ftotbzih\peewee\setup.py", line 76, in _have_sqlite_extension_su
pport
compiler.compile([src_file], output_dir=tmp_dir),
File "C:\anaconda\lib\distutils\_msvccompiler.py", line 327, in compile
self.initialize()
File "C:\anaconda\lib\distutils\_msvccompiler.py", line 224, in initialize
vc_env = _get_vc_env(plat_spec)
File "C:\anaconda\lib\site-packages\setuptools\msvc.py", line 314, in msvc14_get_vc_env
return _msvc14_get_vc_env(plat_spec)
File "C:\anaconda\lib\site-packages\setuptools\msvc.py", line 273, in _msvc14_get_vc_env
out = subprocess.check_output(
AttributeError: 'str' object has no attribute 'decode'
----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

同样的编码问题,同样的解决方法:
找到文件msvc.py
大概在276行:
        out = subprocess.check_output(
'cmd /u /c "{}" {} && set'.format(vcvarsall, plat_spec),
stderr=subprocess.STDOUT,
)
# ).decode('utf-16le', errors='replace')
把decode的部分注释掉即可
 

百度英语中文语音识别为文字服务 python demo【使用requests重写官方demo】

李魔佛 发表了文章 • 0 个评论 • 560 次浏览 • 2021-04-25 11:49 • 来自相关话题

 
官方使用的稍微底层的urllib写的,用过了requests库的人看着不习惯。故重写之,并做了封装。
官方demo:
https://github.com/Baidu-AIP/speech-demo
# -*- coding: utf-8 -*-
# @Time : 2021/4/24 20:50
# @File : baidu_voice_service.py
# @Author : Rocky C@www.30daydo.com

import os
import requests
import sys
import pickle
sys.path.append('..')
from config import API_KEY,SECRET_KEY
from base64 import b64encode
from pathlib import PurePath


BASE = PurePath(__file__).parent

# 需要识别的文件
AUDIO_FILE = r'C:\OtherGit\speech-demo\rest-api-asr\python\audio\2.m4a' # 只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
# 文件格式
FORMAT = AUDIO_FILE[-3:] # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式

CUID = '24057753'
# 采样率
RATE = 16000 # 固定值


ASR_URL = 'http://vop.baidu.com/server_api'

#测试自训练平台需要打开以下信息, 自训练平台模型上线后,您会看见 第二步:“”获取专属模型参数pid:8001,modelid:1234”,按照这个信息获取 dev_pid=8001,lm_id=1234
'''
http://vop.baidu.com/server_api
1537 普通话(纯中文识别) 输入法模型 有标点 支持自定义词库
1737 英语 英语模型 无标点 不支持自定义词库
1637 粤语 粤语模型 有标点 不支持自定义词库
1837 四川话 四川话模型 有标点 不支持自定义词库
1936 普通话远场
'''
DEV_PID = 1737

SCOPE = 'brain_enhanced_asr' # 有此scope表示有asr能力,没有请在网页里开通极速版


class DemoError(Exception):
pass


TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'

def fetch_token():

params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
r = requests.post(
url=TOKEN_URL,
data=params
)

result = r.json()
if ('access_token' in result.keys() and 'scope' in result.keys()):
if SCOPE and (not SCOPE in result['scope'].split(' ')): # SCOPE = False 忽略检查
raise DemoError('scope is not correct')

return result['access_token']

else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')


""" TOKEN end """

def dump_token(token):
with open(os.path.join(BASE,'token.pkl'),'wb') as fp:
pickle.dump({'token':token},fp)

def load_token(filename):

if not os.path.exists(filename):
token=fetch_token()
dump_token(token)
return token
else:
with open(filename,'rb') as fp:
token = pickle.load(fp)
return token['token']

def recognize_service(token,filename):

with open(filename, 'rb') as speech_file:
speech_data = speech_file.read()

length = len(speech_data)
if length == 0:
raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)

b64_data = b64encode(speech_data)
params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID,'speech':b64_data,'len':length,'format':FORMAT,'rate':RATE,'channel':1}

headers = {
'Content-Type':'application/json',
}
r = requests.post(url=ASR_URL,json=params,headers=headers)
return r.json()


def main():
filename = 'token.pkl'
token = load_token(filename)
result = recognize_service(token,AUDIO_FILE)
print(result['result'])

if __name__ == '__main__':
main()



只需要替换自己的key就可以使用。
自己录了几段英文测试了下,还是蛮准的。 查看全部
 
官方使用的稍微底层的urllib写的,用过了requests库的人看着不习惯。故重写之,并做了封装。
官方demo:
https://github.com/Baidu-AIP/speech-demo
# -*- coding: utf-8 -*-
# @Time : 2021/4/24 20:50
# @File : baidu_voice_service.py
# @Author : Rocky C@www.30daydo.com

import os
import requests
import sys
import pickle
sys.path.append('..')
from config import API_KEY,SECRET_KEY
from base64 import b64encode
from pathlib import PurePath


BASE = PurePath(__file__).parent

# 需要识别的文件
AUDIO_FILE = r'C:\OtherGit\speech-demo\rest-api-asr\python\audio\2.m4a' # 只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
# 文件格式
FORMAT = AUDIO_FILE[-3:] # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式

CUID = '24057753'
# 采样率
RATE = 16000 # 固定值


ASR_URL = 'http://vop.baidu.com/server_api'

#测试自训练平台需要打开以下信息, 自训练平台模型上线后,您会看见 第二步:“”获取专属模型参数pid:8001,modelid:1234”,按照这个信息获取 dev_pid=8001,lm_id=1234
'''
http://vop.baidu.com/server_api
1537 普通话(纯中文识别) 输入法模型 有标点 支持自定义词库
1737 英语 英语模型 无标点 不支持自定义词库
1637 粤语 粤语模型 有标点 不支持自定义词库
1837 四川话 四川话模型 有标点 不支持自定义词库
1936 普通话远场
'''
DEV_PID = 1737

SCOPE = 'brain_enhanced_asr' # 有此scope表示有asr能力,没有请在网页里开通极速版


class DemoError(Exception):
pass


TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'

def fetch_token():

params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
r = requests.post(
url=TOKEN_URL,
data=params
)

result = r.json()
if ('access_token' in result.keys() and 'scope' in result.keys()):
if SCOPE and (not SCOPE in result['scope'].split(' ')): # SCOPE = False 忽略检查
raise DemoError('scope is not correct')

return result['access_token']

else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')


""" TOKEN end """

def dump_token(token):
with open(os.path.join(BASE,'token.pkl'),'wb') as fp:
pickle.dump({'token':token},fp)

def load_token(filename):

if not os.path.exists(filename):
token=fetch_token()
dump_token(token)
return token
else:
with open(filename,'rb') as fp:
token = pickle.load(fp)
return token['token']

def recognize_service(token,filename):

with open(filename, 'rb') as speech_file:
speech_data = speech_file.read()

length = len(speech_data)
if length == 0:
raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)

b64_data = b64encode(speech_data)
params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID,'speech':b64_data,'len':length,'format':FORMAT,'rate':RATE,'channel':1}

headers = {
'Content-Type':'application/json',
}
r = requests.post(url=ASR_URL,json=params,headers=headers)
return r.json()


def main():
filename = 'token.pkl'
token = load_token(filename)
result = recognize_service(token,AUDIO_FILE)
print(result['result'])

if __name__ == '__main__':
main()



只需要替换自己的key就可以使用。
自己录了几段英文测试了下,还是蛮准的。

本地代码 搜索脚本 python实现

李魔佛 发表了文章 • 0 个评论 • 589 次浏览 • 2021-04-14 19:34 • 来自相关话题

本来用find+grep可以搞定的,不过如果搜索多个路径和多个规则,写正则可能写过不来
find . -type f -name "*.py" | xargs grep "redis"
上面语句是在py文件中查找redis的字符。
 
 不过如果要在指定多个位置查找,可能要拼接几个管道,并且如果我要几个字符的关系是并集,就是多个关键字要在文本中同时出现,而且不一定在同一行,所以也不好写。
 
所以写了个python脚本,也方便在centos下运行
# -*- coding: utf-8 -*-
# @Time : 2021/4/14 1:46
# @File : search_string_in_folder.py
# @Author : Rocky C@www.30daydo.com

'''
搜索代码脚本
'''
import fire
import glob
import re

# TODO 用PYQT重写一个

PATH_LIST = [r'C:\git\\',r'C:\OtherGit\\',r'C:\OneDrive\viewed_code\\']
POST_FIX = 'py' # 后缀文件
# 关键词
WORDS=[]

EXCLUDE_PATH=[r'C:\OtherGit\cpython']

DEBUG = True

class FileSearcher:

def __init__(self,kw):
self.root_path_list = PATH_LIST
self.default_coding ='utf-8'
self.exception_handle_coding='gbk'
self.kw=[]
if not isinstance(kw,tuple):
kw=(kw,)

for k in kw:
k=k.strip()
self.kw.append(k)

def search(self,file,encoding):
match_dict = dict()

for w in self.kw:
match_dict.setdefault(w, False)

line_number = 0
line_list=list()
with open(file, 'r', encoding=encoding) as fp:

while 1:
try:
line = fp.readline()

except UnicodeDecodeError as e:

if DEBUG:
print(f'Error coding in file {file}')
print(e)

return None,None,None

except Exception as e:
if DEBUG:
print(f'Error in file {file}')
print(e)
break

if not line:
break

line = line.strip()
if not line:
continue

for w in self.kw:
m=re.search(w,line,re.IGNORECASE)
if m:
match_dict.update({w:True})
line_list.append(line_number)

line_number+=1

return True,match_dict.copy(),line_list.copy()

def print_match_result(self,file,line_list,encoding):

with open(file, 'r', encoding=encoding) as fp:
line_number = 0
while 1:
try:
line = fp.readline()
except Exception as e:
if DEBUG:
print(f'Error in file {file}')
print(e)
break

if not line:
break
line=line.strip()

if not line:
continue

if line_number in line_list:
print(f'{file} :: {line_number} ====>\n {line[:50]}\n')

line_number += 1

def run(self):
for path in self.root_path_list:

search_path=path+'**/*.'+POST_FIX

for file in glob.iglob(search_path,recursive=True):

for ex_path in EXCLUDE_PATH:
ex_path=ex_path.replace('\\','')
temp_file=file.replace('\\','')
if ex_path in temp_file:
continue

use_encoding=self.default_coding
encode_proper,match_dict,line_list=self.search(file,use_encoding)

if not encode_proper:
use_encoding = self.exception_handle_coding
encode_proper,match_dict,line_list=self.search(file, use_encoding)

if match_dict is not None and len(match_dict)>0 and all(match_dict.values()):
# print(match_dict.values())
self.print_match_result(file,line_list,use_encoding)
# print(line_list)


def test_error_file():
path=r'C:\git\CodePool\example-code\19-dyn-attr-prop\oscon\schedule2.py'
with open(path,'r',encoding='utf8') as fp:
while 1:
x=fp.readline()
if not x:
break
print(x)

def main(kw):
app = FileSearcher(kw)
app.run()

if __name__ == '__main__':
fire.Fire(main)

运行: python main.py --kw=asyncio,gather
 





  查看全部
本来用find+grep可以搞定的,不过如果搜索多个路径和多个规则,写正则可能写过不来
find . -type f -name "*.py" | xargs grep "redis"

上面语句是在py文件中查找redis的字符。
 
 不过如果要在指定多个位置查找,可能要拼接几个管道,并且如果我要几个字符的关系是并集,就是多个关键字要在文本中同时出现,而且不一定在同一行,所以也不好写。
 
所以写了个python脚本,也方便在centos下运行
# -*- coding: utf-8 -*-
# @Time : 2021/4/14 1:46
# @File : search_string_in_folder.py
# @Author : Rocky C@www.30daydo.com

'''
搜索代码脚本
'''
import fire
import glob
import re

# TODO 用PYQT重写一个

PATH_LIST = [r'C:\git\\',r'C:\OtherGit\\',r'C:\OneDrive\viewed_code\\']
POST_FIX = 'py' # 后缀文件
# 关键词
WORDS=[]

EXCLUDE_PATH=[r'C:\OtherGit\cpython']

DEBUG = True

class FileSearcher:

def __init__(self,kw):
self.root_path_list = PATH_LIST
self.default_coding ='utf-8'
self.exception_handle_coding='gbk'
self.kw=[]
if not isinstance(kw,tuple):
kw=(kw,)

for k in kw:
k=k.strip()
self.kw.append(k)

def search(self,file,encoding):
match_dict = dict()

for w in self.kw:
match_dict.setdefault(w, False)

line_number = 0
line_list=list()
with open(file, 'r', encoding=encoding) as fp:

while 1:
try:
line = fp.readline()

except UnicodeDecodeError as e:

if DEBUG:
print(f'Error coding in file {file}')
print(e)

return None,None,None

except Exception as e:
if DEBUG:
print(f'Error in file {file}')
print(e)
break

if not line:
break

line = line.strip()
if not line:
continue

for w in self.kw:
m=re.search(w,line,re.IGNORECASE)
if m:
match_dict.update({w:True})
line_list.append(line_number)

line_number+=1

return True,match_dict.copy(),line_list.copy()

def print_match_result(self,file,line_list,encoding):

with open(file, 'r', encoding=encoding) as fp:
line_number = 0
while 1:
try:
line = fp.readline()
except Exception as e:
if DEBUG:
print(f'Error in file {file}')
print(e)
break

if not line:
break
line=line.strip()

if not line:
continue

if line_number in line_list:
print(f'{file} :: {line_number} ====>\n {line[:50]}\n')

line_number += 1

def run(self):
for path in self.root_path_list:

search_path=path+'**/*.'+POST_FIX

for file in glob.iglob(search_path,recursive=True):

for ex_path in EXCLUDE_PATH:
ex_path=ex_path.replace('\\','')
temp_file=file.replace('\\','')
if ex_path in temp_file:
continue

use_encoding=self.default_coding
encode_proper,match_dict,line_list=self.search(file,use_encoding)

if not encode_proper:
use_encoding = self.exception_handle_coding
encode_proper,match_dict,line_list=self.search(file, use_encoding)

if match_dict is not None and len(match_dict)>0 and all(match_dict.values()):
# print(match_dict.values())
self.print_match_result(file,line_list,use_encoding)
# print(line_list)


def test_error_file():
path=r'C:\git\CodePool\example-code\19-dyn-attr-prop\oscon\schedule2.py'
with open(path,'r',encoding='utf8') as fp:
while 1:
x=fp.readline()
if not x:
break
print(x)

def main(kw):
app = FileSearcher(kw)
app.run()

if __name__ == '__main__':
fire.Fire(main)

运行: python main.py --kw=asyncio,gather
 

mQm5aIvTh1.png

 

判读一个函数是不是协程

李魔佛 发表了文章 • 0 个评论 • 390 次浏览 • 2021-04-09 20:27 • 来自相关话题

传入的是函数名,不需要加入括号:
def check_coroutine(fun):
if iscoroutinefunction(fun):
print('是协程')
else:
print('不是协程')

async def visit_web():
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
'ignoreDefaultArgs':True,
}
)
page = await browser.newPage()

# 可以在launch下配置
# await page.setViewport({
# "width": 1900,
# "height": 1020
# })


# 先执行下面的JS 再去goto
await page.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
# await page.screenshot({'path': 'test.png', 'fullPage': True})
# await page.pdf({'path': 'test.pdf'})
# await asyncio.sleep(5)

await page.goto(url=URL)

# 这里的js是异步的写法
dimensions = await page.evaluate(
'''
()=>{
return {
width:document.documentElement.clientWidth,
height:document.documentElement.clientHeight,
deviceScaleFactor_:window.devicePixelRatio,
}
}
'''
)

result = await page.evaluate(
'''
()=>{
var title = document.title;
return {title:title};
}
'''

)


await browser.close()

调用:
check_coroutine(visit_web)注意,上面不能用visit_web()

  查看全部
传入的是函数名,不需要加入括号:
def check_coroutine(fun):
if iscoroutinefunction(fun):
print('是协程')
else:
print('不是协程')

async def visit_web():
browser = await pyppeteer.launch(
{'headless': False,
'userDataDir': UserDataDir,
'defaultViewport': {'width': 1800, 'height': 1000},
'ignoreDefaultArgs':True,
}
)
page = await browser.newPage()

# 可以在launch下配置
# await page.setViewport({
# "width": 1900,
# "height": 1020
# })


# 先执行下面的JS 再去goto
await page.evaluate(
'''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
# await page.screenshot({'path': 'test.png', 'fullPage': True})
# await page.pdf({'path': 'test.pdf'})
# await asyncio.sleep(5)

await page.goto(url=URL)

# 这里的js是异步的写法
dimensions = await page.evaluate(
'''
()=>{
return {
width:document.documentElement.clientWidth,
height:document.documentElement.clientHeight,
deviceScaleFactor_:window.devicePixelRatio,
}
}
'''
)

result = await page.evaluate(
'''
()=>{
var title = document.title;
return {title:title};
}
'''

)


await browser.close()

调用:
check_coroutine(visit_web)
注意,上面不能用visit_web()

 

转换很多逗号的,,,,,,,,, JS的数组为python列表

李魔佛 发表了文章 • 0 个评论 • 452 次浏览 • 2021-03-29 18:54 • 来自相关话题

不知道JS的写法就是这样还是这样的,一个列表可以这么写
var arr = [,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1,2,3,4,5] 前面的逗号就是没有数据,None或者0.
然后JS的代码可以不填充任何数据。python要把它转为list,要怎么做的?
 
有2个方法:
 
1. 最简单,因为,,的意思是0,0, 那么我们可以把两个逗号替换成0,0,
但是如果前面的逗号数是单数,比如是3个逗号,
arr=[,,,1,2,3]
直接替换2个逗号为0,0,的话,结果是0,0,,1,2,3
结果也不对。
多了一对逗号
然后可以直接再替换一次,, 把两个的地方替换为1个,
 
2. 使用finditer找出每个多余2个逗号的起始和结束,然后替换为0, 即可。
for m in re.finditer(',{2,}'):
    start=m.start()
    end=m.end()
     查看全部
不知道JS的写法就是这样还是这样的,一个列表可以这么写
var arr = [,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1,2,3,4,5] 前面的逗号就是没有数据,None或者0.
然后JS的代码可以不填充任何数据。python要把它转为list,要怎么做的?
 
有2个方法:
 
1. 最简单,因为,,的意思是0,0, 那么我们可以把两个逗号替换成0,0,
但是如果前面的逗号数是单数,比如是3个逗号,
arr=[,,,1,2,3]
直接替换2个逗号为0,0,的话,结果是0,0,,1,2,3
结果也不对。
多了一对逗号
然后可以直接再替换一次,, 把两个的地方替换为1个,
 
2. 使用finditer找出每个多余2个逗号的起始和结束,然后替换为0, 即可。
for m in re.finditer(',{2,}'):
    start=m.start()
    end=m.end()
    

python 转换excel数据,适配flourish数据格式

李魔佛 发表了文章 • 0 个评论 • 580 次浏览 • 2021-02-20 00:28 • 来自相关话题

flourish可视化网站要求excel的时间是按列排的,也就是我有1000个数据,那么也就需要1000列,这个和dataframe的默认数据是转置的,也就是需要把dataframe的行变成列。
 
而在数据量很大的情况下,pandas的xlwt是不支持265行以上的,所以需要用xlsxwriter这个库,通过手动转换
 
 
import xlsxwriter #导入模块
workbook = xlsxwriter.Workbook('new_people.xlsx') #新建excel表
worksheet = workbook.add_worksheet('sheet1') #新建sheet(sheet的名称为"sheet1")
把行列重新写入。
for index,item in df.iterrows():
date=item['上市日期']
count=item['申购人数']
date=date.replace(' 00:00:00','')
worksheet.write(0,index,date)
worksheet.write(1,index,count)

workbook.close()
index就是列数,不断地写在第一行和第二行,就可以达到所要的需求了。
  查看全部
flourish可视化网站要求excel的时间是按列排的,也就是我有1000个数据,那么也就需要1000列,这个和dataframe的默认数据是转置的,也就是需要把dataframe的行变成列。
 
而在数据量很大的情况下,pandas的xlwt是不支持265行以上的,所以需要用xlsxwriter这个库,通过手动转换
 
 
import xlsxwriter   #导入模块
workbook = xlsxwriter.Workbook('new_people.xlsx') #新建excel表
worksheet = workbook.add_worksheet('sheet1') #新建sheet(sheet的名称为"sheet1")

把行列重新写入。
for index,item in df.iterrows():
date=item['上市日期']
count=item['申购人数']
date=date.replace(' 00:00:00','')
worksheet.write(0,index,date)
worksheet.write(1,index,count)

workbook.close()

index就是列数,不断地写在第一行和第二行,就可以达到所要的需求了。
 

安装nodejs后新增的python把原来的python版本覆盖了

李魔佛 发表了文章 • 0 个评论 • 1241 次浏览 • 2021-01-29 14:58 • 来自相关话题

如果安装nodejs最后勾选了python环境,系统默认帮你装上最新的python版本,还自动把环境变量帮你加上,真是贴心。
 解决办法:
win10: 打开环境变量,把第一个python39或者类似字样的环境变量往下移,最好移到最后。
如果安装nodejs最后勾选了python环境,系统默认帮你装上最新的python版本,还自动把环境变量帮你加上,真是贴心。
 解决办法:
win10: 打开环境变量,把第一个python39或者类似字样的环境变量往下移,最好移到最后。

python解析windows日志文件,查询服务器是否被人攻击

李魔佛 发表了文章 • 0 个评论 • 1520 次浏览 • 2021-01-17 23:49 • 来自相关话题

最近大致浏览了下windows server的日志记录,发现有不少的异地IP进行了登录尝试,而且有部分是登录成功的,但不确定是否本人自己登陆,所以借助python,对日志进行解析,并根据IP查询其远程物理地址。
 
最终效果:






【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
 
大致代码如下:import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re

class WindowsLogger():

def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'

def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh

return None

def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))

# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]

for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data

if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data

if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data

if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address

yield IpAddress,ip,targetUsername,ProcessName

def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)

if __name__ == '__main__':
main()
D:\share\1.evtx 为日志导出文件

原创文章,转载请注明出处:
http://30daydo.com/article/44130 
 
完整代码,可以通过公众号回复: windows日志解析获取
 

  查看全部
最近大致浏览了下windows server的日志记录,发现有不少的异地IP进行了登录尝试,而且有部分是登录成功的,但不确定是否本人自己登陆,所以借助python,对日志进行解析,并根据IP查询其远程物理地址。
 
最终效果:


cmd_vKiBIjQLpd.png

【MD,老毛子就是天天在扫描,爆破密码,即使改了端口还是在枚举】
 
大致代码如下:
import mmap
import contextlib
from Evtx.Evtx import FileHeader
from Evtx.Views import evtx_file_xml_view
from xml.dom import minidom
from ip_convertor import IP
import re

class WindowsLogger():

def __init__(self,path):
self.path = path
self.formator = 'IP:{:10}\tlocation:{:20}\tUser:{:15}\tProcess:{}'

def read_file(self):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
return fh

return None

def parse_log_detail(self,filteID):
with open(self.path,'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(),0,access=mmap.ACCESS_READ)) as buf:
fh = FileHeader(buf,0)
for xml, record in evtx_file_xml_view(fh):
#只输出事件ID为4624的内容
# InterestEvent(xml,4624)
for IpAddress,ip,targetUsername,ProcessName in self.filter_event(xml,filteID):
print(self.formator.format(IpAddress,ip,targetUsername,ProcessName))

# 过滤掉不需要的事件,输出感兴趣的事件
def filter_event(self,xml,EventID,use_filter=True):
xmldoc = minidom.parseString(xml)
# 获取EventID节点的事件ID
collections = xmldoc.documentElement
events=xmldoc.getElementsByTagName('Event')
for evt in events:
eventId = evt.getElementsByTagName('EventID')[0].childNodes[0].data
time_create = evt.getElementsByTagName('TimeCreated')[0].getAttribute('SystemTime')
eventData = evt.getElementsByTagName('EventData')[0]

for data in eventData.getElementsByTagName('Data'):
if data.getAttribute('Name')=='IpAddress':
IpAddress=data.childNodes[0].data

if data.getAttribute('Name')=='TargetUserName':
targetUsername = data.childNodes[0].data

if data.getAttribute('Name')=='ProcessName':
ProcessName = data.childNodes[0].data

if use_filter is True and eventId==EventID:
ip=''
if re.search('^\d+',IpAddress):
ip = IP(IpAddress).ip_address

yield IpAddress,ip,targetUsername,ProcessName

def main():
path=r'D:\share\1.evtx'
filter_id = '4624'
app = WindowsLogger(path)
app.parse_log_detail(filter_id)

if __name__ == '__main__':
main()

D:\share\1.evtx 为日志导出文件

原创文章,转载请注明出处:
http://30daydo.com/article/44130 
 
完整代码,可以通过公众号回复: windows日志解析获取