【学习记录】异步爬虫

。。。

jupyter中额外使用

1
2
import nest_asyncio
nest_asyncio.apply()

简易http请求例子

一些前面的东西

1
2
3
4
5
6
import aiohttp
import asyncio
headers = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.63'
}

简易get 请求 例子

猜测其他请求可以根据request库类比

携程 text(encoding默认None, 内部自动处理) 返回 aiohttp.ClientResponse类型, 文档可查

1
2
3
4
5
6
7
async with aiohttp.ClientSession() as session:
async with session.request(
'GET',
'https://wldcmzy.github.io/',
headers = headers
) as res:
print(await res.text(encoding='utf-8'))

等效1

1
2
3
4
5
6
async with aiohttp.ClientSession() as session:
async with session.get(
'https://wldcmzy.github.io/',
headers = headers
) as res:
print(await res.text(encoding='utf-8'))

等效2

1
2
3
4
5
6
7
8
session = aiohttp.ClientSession()
res = await session.request(
'GET',
'https://wldcmzy.github.io/',
headers = headers
)
print(await res.text(encoding='utf-8'))
session.close()

aiohttp.ClientSession构造函数可传一些参数

1
2
3
4
5
6
async with aiohttp.ClientSession(headers = headers) as session:
async with session.get(
'https://wldcmzy.github.io/',
headers = headers
) as res:
print(await res.text(encoding='utf-8'))

异步爬虫简易实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import aiohttp
import asyncio
import time
urls = [
'https://wldcmzy.github.io/',
'https://docs.aiohttp.org/en/stable/index.html',
'https://github.com/Nearrin/HollowKnight.PureZote',
'https://www.baidu.com/',
]
async def getHTML(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
print(time.time(), (await res.text())[ : 100], sep = '\n', end = '\n<<<<<<<<<<<<<\n')

loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(getHTML(each)) for each in urls]
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)


限制并发数量 asyncio.Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import aiohttp
import asyncio
import time

URLS = [
'https://wldcmzy.github.io/',
'https://docs.aiohttp.org/en/stable/index.html',
'https://www.baidu.com/',
'https://blog.csdn.net/mixintu/article/details/102424471',
]

async def getHTML(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
return url, res.status

async def work(lst, semaphore):
async with semaphore:
url, sta = await getHTML(lst)
print(url, sta)

# async def main(loop: asyncio.AbstractEventLoop) -> None:
async def main() -> None:

semaphore = asyncio.Semaphore(2)
tasks = [work(each, semaphore) for each in URLS]
await asyncio.wait(tasks)
print('ok')

if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
st = time.time()
print('wait', st)
loop.run_until_complete(main())
print('over, usetime:', time.time() - st , 's')
st = time.time()
print('wait 2nd', st)
loop.run_until_complete(main())
print('over, usetime:', time.time() - st , 's')
finally:
# loop.close()
print('closed')


例子

爬漫画,貌似被思路限制了速度的异步爬虫

github: Mess-Mess/spider_didiaomh.com_async.py at master · Wldcmzy/Mess-Mess (github.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from bs4 import BeautifulSoup
import re
from Wrappers import logger, remove_invalid_element_in_windows_path, try_except_ensure
from Wrappers.Affix import Affix_OnlyZeroExample
from Wrappers.HTMLwrapper import Image_HTMLwrapper_Lv2
import os
import aiohttp
import asyncio

class SpiderX:
DOMIN: str = 'https://www.didiaomh.com/'

def __init__(
self,
catalog_url: str,
outpath_name: str = 'out',
capture_format: str = 'capture{arg}',
image_format: str = 'image{arg}.jpg',
max_capture_number_length: int = 3,
max_page_number_length: int = 3,
work_span: tuple[int] = (0, 0),
concurrency: int = 5,
myproxy = None,
# headers: dict = {},
) -> None:
'''
catalog_url:
目录页链接
outpath_name:
输出路径名
capture_format:
章节格式
image_format:
图片格式
max_capture_number_length:
最大章节序号长度
max_page_number_length:
最大图片序号长度
work_span: tuple[int]:
爬取章节范围(小, 大)
默认爬取所有
concurrency:
同时进行的任务数量
proxies:
代理服务器信息
'''
self.catalog_url = catalog_url if catalog_url[ : len(self.DOMIN)] == self.DOMIN else self.DOMIN + catalog_url
self.outpath_name = outpath_name
self.check_path(self.outpath_name)
self.capture_format = capture_format
self.image_format = image_format
self.max_capture_number_length = max_capture_number_length
self.max_page_number_length = max_page_number_length
self.work_span = work_span
self.myproxy = myproxy

self.page_counter = 0
self.affix = Affix_OnlyZeroExample()

self.semaphore = asyncio.Semaphore(concurrency)
self.session = aiohttp.ClientSession()


def toreload_format_page_name(self, index: int):
return self.image_format.format(arg = self.affix.add_prefix(str(index), self.max_page_number_length))
def toreload_format_capture_name(self, index: int, capture_title: str):
return self.capture_format.format(arg = self.affix.add_prefix(str(index), self.max_page_number_length) + capture_title)

def check_path(self, path: str) -> None:
'''检查目录存在性,若目录没有, 则创建'''
if not os.path.exists(path):
os.mkdir(path)
logger.debug(f'建立目录:{path}...')

async def get_html(self, url: str) -> str:
'''抓取网页html信息'''
async with self.semaphore:
async with self.session.get(url, proxy = self.myproxy) as response:
return await response.text()

async def get_catalog(self) -> dict[int, tuple[str, str]]:
'''
生成漫画目录 (章节-url 映射)
return:
{ index : (url, capture_title), }
'''
html = await self.get_html(self.catalog_url)
soup = BeautifulSoup(html)
li_tag_list: list[BeautifulSoup] = soup.find('ul', class_ = 'chapter-list clearfix').findAll('li')
catalog = {}
for i, each in enumerate(li_tag_list):
catalog[i + 1] = self.DOMIN + each.a['href'], each.a.string
return catalog

async def save_image(self, pathname: str, data: bytes) -> None:
'''保存一张图片'''
with open(pathname, 'wb') as f:
f.write(data)

async def download_one_image(self, url: str, pathname: str) -> None:
'''下载一张图片'''
async with self.session.get(url) as res:
await self.save_image(pathname, await res.read())

async def download_one_capture_perpage(self, soup: BeautifulSoup, foldername: str) -> None:
'''下载一html页的图片'''
li_tag_list: list[BeautifulSoup] = soup.find('div', class_ = 'comiclist').findAll('div', class_ = 'comicpage')
tasks = []
for each in li_tag_list:
url: str = each.img['src']
self.page_counter += 1
filename = remove_invalid_element_in_windows_path(self.toreload_format_page_name(self.page_counter))
tasks.append(asyncio.ensure_future(self.download_one_image(url, f'{self.outpath_name}/{foldername}/{filename}')))
await asyncio.wait(tasks)


async def download_one_capture(self, url: str, foldername: str) -> None:
'''下载一章图片'''
self.check_path(f'{self.outpath_name}/{foldername}')
self.page_counter = 0
html = await self.get_html(url)
soup = BeautifulSoup(html)
total_pages = int(re.search('第[0-9]+/([0-9]+)页', soup.find('select', class_ = 'selectpage').option.string).group(1))
logger.log('html page 1')
await self.download_one_capture_perpage(soup, foldername)
for i in range(2, total_pages + 1):
logger.log(f'html page {i}')
html = await self.get_html(url.replace('.html', f'?page={i}'))
soup = BeautifulSoup(html)
await self.download_one_capture_perpage(soup, foldername)

@try_except_ensure
async def download_all_caputres(self) -> None:
'''下载漫画所有图片'''
catalog = await self.get_catalog()
work_span_low, work_span_high = self.work_span
flag = work_span_high > work_span_low
logger.debug(f'work span flag:{flag}, low:{work_span_low}, high:{work_span_high}')
for key, value in catalog.items():
if flag:
if key > work_span_high or key < work_span_low:
continue
logger.log(f'章节进度:{key}/{len(catalog)}')
url, capture_title = value
capture_name = remove_invalid_element_in_windows_path(self.toreload_format_capture_name(key, capture_title))
await self.download_one_capture(url, capture_name)


if __name__ == '__main__':
loop = asyncio.get_event_loop()

x = SpiderX(
'https://www.didiaomh.com/manhua/6500.html',
outpath_name = '偷星九月天IMAGES',
)

try:
loop.run_until_complete(x.download_all_caputres())
except Exception as e:
logger.error(f'{type(e)}|{str(e)}')
finally:
loop.close()

#===========================================================================

class TX9MOONSKY(Image_HTMLwrapper_Lv2):
def __init__(
self,
root: str,
html_path: str,
html_vital_element: str,
html_collection_name,
prefix_LvRoot: str,
prefix_Lv2: str,
) -> None:
super().__init__(
root,
html_path,
html_vital_element,
html_collection_name,
prefix_LvRoot,
prefix_Lv2
)

def toreload_parse_diff_element_title(self, folder_name: str) -> str:
return super().toreload_parse_diff_element_title(folder_name)

def toreload_parse_diff_element_P(self, folder_name: str) -> str:
return super().toreload_parse_diff_element_P(folder_name)[3 : ]


y = TX9MOONSKY(
root = '偷星九月天IMAGES',
html_path = '偷星九月天HTMLS',
html_vital_element= '偷星九月天 {arg}',
html_collection_name= ' 偷星九月天 全集 ',
prefix_LvRoot='capture',
prefix_Lv2='image'
)
y.create_htmls()

爬某漫画网站的Dr.STONE漫画

?

起因

最近在看Mr.STONE,但是没有人做剧透,只好自己去看漫画

其他

一开始写了个同步,但感觉挺慢,于是随便搜了点资料尝试换成异步,以前还没接触过异步爬虫捏,大概是方法错了,没感觉有显著的加速,不响学辣,卷考研去乐。

github: Mess-Mess/spider_DrSTONE at master · Wldcmzy/Mess-Mess (github.com)

3.0版本

由于之前漫画质量不是很好,翻译整合了好几家,繁简都有,而且存在章节错位,于是把download_1.0and2.0.py进行了换源重置,并和pic_to_url.py的功能做了整合,成为download_3.0.py。换源后章节错位不存在了,但还是繁简都有(厚礼蟹)。

后来发现换源后有几章网站那边图片404,可以通过1.0、2.0版本的成果互补。404check.py用于检测哪几章崩了,原理是检测目录中是否有md5值重复的图片,有时会误伤(比如网站抽风一张图放两次)。其他没写的思路:若404图片都是一张(确实都是一张),可以检测特定哈希值。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import requests
import re
import bs4
import time
import os
import typing

class MyEasylogger:
def __init__(self, path: str = './', filename = 'DownloadLOG.log') -> None:
self.path = path
self.filename = filename
self.makelog('MyEasylogger类被实例化', 'MyEasylogger', False)

def makelog(self, data: str, tag: str = 'default', ifprint: bool = True) -> None:
if ifprint:
print(data)
ifexist = os.path.exists(self.filename)
with open(self.path + self.filename, 'a+') as f:
if not ifexist:
f.write(f'==> {str(time.localtime()[ : 6])} | [MyEasylogger] 日志建立\n')
f.write(f'==> {str(time.localtime()[ : 6])} | [{tag}] {data}\n')

def log(self, data: str, ifprint = True):
self.makelog(data, 'log', ifprint)

def debug(self, data: str, ifprint = True):
self.makelog(data, 'debug', ifprint)

def error(self, data: str, ifprint = True):
self.makelog(data, 'error', ifprint)
logger = MyEasylogger()

def try_except_ensure(func):
def _(*args):
try:
return func(*args)
except Exception as e:
def __(e : Exception, *args) -> None:
logger.error(str(type(e)) + '|' + str(e))
return __(e, *args)
return _

class Downloader:
DOMIN = 'http://rimanb.com/'

HTML = '''
<!DOCTYPE html>
<html>
<head>
<title>{capture}</title>
<meta charset="utf-8">
<style>
body{{
background-color: black;
}}
div#pages{{
text-align: center;
}}
p#p{{
font-size: 35px;
text-align: center;
color: #66ffff;
}}
span#capture_number{{
color: #ff66ff;
}}
</style>

</head>
<body>
{args}
</body>
</html>
'''
IMAGE = ' <div id="pages"><img onload="if(this.width >= document.documentElement.clientWidth){{this.width = document.documentElement.clientWidth}}" align="middle" src="{page}"></img></div>\n'
P = ' <p id="p"><strong>{caricature_name} 第 <span id="capture_number">{p}</span> 话</strong></p>\n'
SEP = '<br><br>\n <hr style="FILTER:alpha(opacity=100,finishopacity=0,style=3)" width="95%"color=#00FF7F SIZE=5>'
def __init__(
self,
catalog_url: str = 'http://rimanb.com/book/2407',
start_end: tuple[int, int] = (39, 232),
image_format: str = '.jpg',
outpath_root: str = './out/',
htmlpath: str = './html/',
caricature_name: str = 'Mr.STONE石纪元',
page_number_length:int = 2,
capture_number_length: int = 3,
headers: dict = {
'User-Agent': r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47',
},
) -> None:
'''
catalog_url:
漫画的章节列表页面url
start_end:
爬取漫画的起始和终止章节
image_format:
图片保存格式
outpath_root:
输出根路径, 以/结尾
htmlpath:
以便于观看的html文件整合图片,html文件的输出路径
caricature_name:
漫画名称, 用于html要素命名
page_number_length:
命名图片序号所需的字符串长度
capture_number_length:
命名章节序号所需的字符串长度
headers:
请求头信息
'''
self.start, self.end = start_end
self.catalog_url = catalog_url
self.image_format = image_format
self.outpath_root = outpath_root
self.htmlpath = htmlpath
self.caricature_name = caricature_name
self.page_number_length = page_number_length
self.capture_number_length = capture_number_length
self.headers = headers
if not os.path.exists(self.outpath_root):
os.mkdir(self.outpath_root)
logger.debug(f'输出目录{self.outpath_root}不存在, 已经成功建立...')

def prefix_zero(self, ss: typing.Union[str, int], leng: int) -> str:
'''
为字符串ss填充前导0使其达到长度leng
ss:
初始字符串
leng:
目标长度
'''
ss = str(ss)
assert len(ss) <= leng
if len(ss) < leng:
ss = '0' * (leng - len(ss)) + ss
return ss

def format_page_number(self, page: str) -> None:
'''
把页码前面补0扩展到正确长度
page:
页码字符串
'''
return self.prefix_zero(page, self.page_number_length)

def format_capture_number(self, cap: str) -> None:
'''
把章节前面补0扩展到正确长度
cap:
章节码字符串
'''
return self.prefix_zero(cap, self.capture_number_length)

def get_url_dict(self) -> dict[int, str]:
'''
获得漫画每一章的url
return: dic
漫画的所有章节的章节号-url映射表
{章节号 : url}
'''
res = requests.get(self.catalog_url, headers = self.headers)
soup = bs4.BeautifulSoup(res.text)
lst = soup.findAll('span', class_ = 'works-chapter-item')
dic = {}
for each in lst:
id = re.search('第([0-9]+)话', str(each.a['title'])).group(1)
href = each.a['href']
dic[int(id)] = Downloader.DOMIN + href
return dic

def parse_capture(self, url: str) -> list[tuple[int, str]]:
'''
获取漫画特定章节的所有图片的链接
url:
漫画特定章节的url
return: lst
url对应漫画章节的所有图片链接
[(图片序号, url), ]
'''
res = requests.get(url, headers = self.headers)
soup = bs4.BeautifulSoup(res.text)
imgs = soup.find('ul', id = 'comicContain', class_ = 'comic-contain').findAll('li')
lst = []
for i, each in enumerate(imgs):
lst.append((i + 1, Downloader.DOMIN + each.img['src']))
return lst

def download_one_picture(self, url: str, filepath: str, filename: str) -> None:
'''
下载一张图片
url:
图片地址
filepath:
文件路径, 以"/"结尾
filename:
文件名
'''
res = requests.get(url, headers = self.headers)
with open(f'{filepath}{filename}.{self.image_format}', 'wb') as f:
f.write(res.content)
logger.log(f'下载图片并保存为{filepath}{filename}.{self.image_format}完成...')

@try_except_ensure
def run(self) -> None:
'''
下载图片
下载的漫画链接为: self.catalog_url
下载的起始章节为: self.start
下载的结束章节为: self.end
下载的图片保存为: self.outpath_root/capture章节号/page图片号.self.imageformat
'''
logger.log(f'开始下载,漫画目录s链接为:{self.catalog_url}')
logger.log('正在获取章节目录...')
self.url_dict = self.get_url_dict()
logger.log(f'章节目录获取完成,共{len(self.url_dict)}个项目...')
for capture in range(self.start, self.end + 1):
logger.log(f'!!!章节进度: {capture}({self.start}-{self.end}) ({capture - self.start + 1}/{self.end - self.start + 1})')
outpath = f'{self.outpath_root}capture{self.format_capture_number(capture)}/'
if not os.path.exists(outpath):
logger.debug(f'创建目录{outpath}')
os.mkdir(outpath)
if capture not in self.url_dict:
logger.error(f'在章节目录中没有第{capture}章的记录(KeyError:{capture})')
continue
image_list = self.parse_capture(self.url_dict[capture])
for pid, purl in image_list:
logger.log(f'!图片进度: ({pid}/{len(image_list)})')
self.download_one_picture(purl, outpath,'page' + self.format_page_number(pid))
logger.log('下载结束...')

def combie_one_caputre_images_to_html(self, folder_name: int) -> str:
'''
把一整章的图片链接都整合为html语言图片块
folder_name:
特定章节对应的文件夹名
return: html
拥有folder_name文件夹中所有以page链接的html文件格式字符串
'''
pages = [each for each in os.listdir(self.outpath_root + folder_name) if each[ : 4] == 'page']
pages = sorted(pages)
html = ''
image_src = '../' + self.outpath_root + folder_name + '/'
for each in pages:
html += Downloader.IMAGE.format(page = image_src + each)
return html

def create_one_html(self, name: str, html: str, collection: bool = False) -> None:
'''
生成一章漫画的html文件
name:
要生成的html文件名称
html:
本章漫画图片的html格式字符串
collection:
本次是否生成总集
'''
index = f'第{int(name[7 : ])}话' if not collection else '总集篇'
with open(self.htmlpath + self.caricature_name + '_' + name + '.html', 'w', encoding='utf-8') as f:
f.write(Downloader.HTML.format(capture = f'{self.caricature_name} {index}', args = html))

def create_htmls(self, collection: bool = True) -> None:
'''
为漫画所有章节生成html文件
collection:
是否生成总集篇文件
'''
if not os.path.exists(self.htmlpath):
os.mkdir(self.htmlpath)
logger.debug(f'{self.htmlpath}不存在, 建立成功')
logger.log(f'开始为漫画创建html文件, {"" if collection else "不"}包括创建总集篇')
capture_folders = [each for each in os.listdir(self.outpath_root) if each[ : 7] == 'capture']
totalhtml = ''
for each in capture_folders:
html = Downloader.SEP
html += Downloader.P.format(p = int(each[7 : ]), caricature_name = self.caricature_name)
html += self.combie_one_caputre_images_to_html(each)
self.create_one_html(each, html)
logger.log(each + 'html创建完成...')
if collection:
totalhtml += html
if collection:
self.create_one_html('ALL CAPTURES', totalhtml, True)
logger.log('总集篇html创建完成...')


if __name__ == '__main__':
d = Downloader()
d.run()
d.create_htmls()

404check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import hashlib

path = 'out/'
folders = os.listdir(path)
lst404 = []
for folder in folders:
pages = os.listdir(path + folder + '/')
dic = {}
for page in pages:
with open(path + folder + '/' + page, 'rb') as f:
hasher = hashlib.md5(f.read())
hash = hasher.hexdigest()
if hash not in dic:
dic[hash] = 1
else:
lst404.append(folder)
print(folder)
break
with open('404captures.txt', 'w') as f:
for each in lst404:
f.write(each + '\n')

其他历史版本

代码(有漏洞版本)

漏洞为:直接按照两相邻的章节之间的差异设置批量爬取逻辑,忽略了个别中间的无序章节,导致从最早的无序章节开始至以后的章节全部错误,即使后期顺序关系恢复的,也无法正确爬取。

解决方案:(见代码修正版本)先获取从上级页面获取章节目录,得到每一章节和其链接的映射表,查表定位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

from asyncio.log import logger
import requests
from bs4 import BeautifulSoup
import re
from time import localtime
from os import mkdir
from os.path import exists
import asyncio

class Mylogger:
def __init__(self) -> None:
pass

# def inactive(self) -> None:
# self.fp.close()

def log(self, data: str, ifprint = True) -> None:
if ifprint:
print(data)

#换异步后可能报错 log可禁用
#return None
with open('DownloadLOG.log', 'a+') as f:
f.write(f'==> {str(localtime()[ : 6])} | {data}\n')

class Downloader:

DrSTONE_ADDR = 25638 - 1

# 页码长度
PAGE_NUMBER_LENGTH = 2

# 章节长度
CAPTURE_NUMBER_LENGTH = 3

# 起始,截至章节
CAPTURE_START, CAPTURE_END = 1, 233

# 基础漫画地址
BASE_URL = r'https://omyschool.com/article_detail/152/{addr}/Dr.STONE%20%E7%9F%B3%E7%BA%AA%E5%85%83/{cap}%E8%A9%B1/'

# 请求头信息
MY_HEADERS = {
'User-Agent': r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47',

}

# 保存路径
PATH = './out/'

def __init__(self) -> None:
self.logger = Mylogger()
if not exists(Downloader.PATH):
mkdir(Downloader.PATH)

def prefix_zero(self, ss: str, leng: int) -> str:
'''为字符串ss填充前导0使其达到长度leng'''
ss = str(ss)
if len(ss) < leng:
ss = '0' * (leng - len(ss)) + ss
return ss

def format_page_number(self, page: str) -> None:
'''
把页码前面补0扩展到正确长度
page: 页码字符串
'''
return self.prefix_zero(page, Downloader.PAGE_NUMBER_LENGTH)

def format_capture_number(self, cap: str) -> None:
'''
把章节前面补0扩展到正确长度
cap: 章节码字符串
'''
return self.prefix_zero(cap, Downloader.CAPTURE_NUMBER_LENGTH)

def form_capture_url(self, capture: int) -> str:
'''
返回第capture章的网页地址
capture: 章序号
'''
str_capture = str(capture)
str_addr = str(Downloader.DrSTONE_ADDR + capture)
if capture < 10: str_capture = '0' + str_capture
return Downloader.BASE_URL.format(cap = str_capture, addr = str_addr)

async def download_one_picture(self, url: str, filepath: str, filename: str) -> None:
'''
下载一张图片
url: 图片地址
filepath: 文件路径, 以"/"结尾
filenamer: 文件名
page: 页码号 用于显示进度
'''
try:
fileformat = url.split('.')[-1]
res = requests.get(url, headers=Downloader.MY_HEADERS)
with open(f'{filepath}{filename}.{fileformat}', 'wb') as f:
f.write(res.content)
print(f'{filepath}{filename}.{fileformat}完成...')
except Exception as e:
self.logger.log(f'{type(e)} | {str(e)} | 下载图片{filepath}{filename}时出错。')

def parse_one_capture(self, capture_number: int) -> list[tuple[str, str]]:
'''
解析第capture_number章的内容, 返回这一章所有图片的链接和序号
capture_number: 章序号
'''
try:
url = self.form_capture_url(capture_number)
print(url)
res = requests.get(url, headers=Downloader.MY_HEADERS)
soup = BeautifulSoup(res.text.replace('amp-img', 'amp_img'))
data_list = soup.find_all('div', {'data-id': True})
image_list = []
for each in data_list:
picture_url = each.amp_img['src']
picture_id = re.search('第([0-9]+)页', each.amp_img['alt']).group(1)
image_list.append((picture_url, picture_id))
return image_list
except Exception as e:
self.logger.log(f'{type(e)} | {str(e)} | 解析第{capture_number}章链接时出错,可能部分或全部图片未正确下载。')
return []

def run(self):
'''主方法'''
#try:
print('开始下载...')
for capture in range(Downloader.CAPTURE_START, Downloader.CAPTURE_END + 1):
print(f'章进度:{capture - Downloader.CAPTURE_START + 1} / {Downloader.CAPTURE_END + 1 - Downloader.CAPTURE_START}')
path = f'{Downloader.PATH}capture{self.format_capture_number(capture)}/'
if not exists(path):
mkdir(path)
image_list = self.parse_one_capture(capture)
if not image_list: continue
tasks = []
for picture_url, picture_id in image_list:
tasks.append(asyncio.ensure_future(
self.download_one_picture(
picture_url,
path,
'page' + self.format_page_number(picture_id)
)
))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('下载结束...')
# except Exception as e:
# self.logger.log(f'{type(e)} | {str(e)} | run方法遇到未知错误')

if __name__ == '__main__':
d = Downloader()
d.run()

代码(修正版本)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

from asyncio.log import logger
import requests
from bs4 import BeautifulSoup
import re
from time import localtime
from os import mkdir
from os.path import exists
import asyncio

class Mylogger:
def __init__(self) -> None:
pass

# def inactive(self) -> None:
# self.fp.close()

def log(self, data: str, ifprint = True) -> None:
if ifprint:
print(data)

#换异步后可能报错 log可禁用
#return None
with open('DownloadLOG.log', 'a+') as f:
f.write(f'==> {str(localtime()[ : 6])} | {data}\n')

class Downloader:

#DrSTONE_ADDR = 25638 - 1
DrSTONE_ADDR = 239049 - 1

# 页码长度
PAGE_NUMBER_LENGTH = 2

# 章节长度
CAPTURE_NUMBER_LENGTH = 3

# 起始,截至章节
CAPTURE_START, CAPTURE_END = 136, 233

# 基础漫画地址
BASE_URL = r'https://omyschool.com/article_detail/152/{addr}/Dr.STONE%20%E7%9F%B3%E7%BA%AA%E5%85%83/{cap}%E8%A9%B1/'

# 请求头信息
MY_HEADERS = {
'User-Agent': r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.81 Safari/537.36 Edg/104.0.1293.47',

}

# 保存路径
PATH = './out/'

def __init__(self) -> None:
self.logger = Mylogger()
if not exists(Downloader.PATH):
mkdir(Downloader.PATH)

def prefix_zero(self, ss: str, leng: int) -> str:
'''为字符串ss填充前导0使其达到长度leng'''
ss = str(ss)
if len(ss) < leng:
ss = '0' * (leng - len(ss)) + ss
return ss

def format_page_number(self, page: str) -> None:
'''
把页码前面补0扩展到正确长度
page: 页码字符串
'''
return self.prefix_zero(page, Downloader.PAGE_NUMBER_LENGTH)

def format_capture_number(self, cap: str) -> None:
'''
把章节前面补0扩展到正确长度
cap: 章节码字符串
'''
return self.prefix_zero(cap, Downloader.CAPTURE_NUMBER_LENGTH)

def form_capture_url(self, capture: int) -> str:
'''
返回第capture章的网页地址
capture: 章序号
'''
str_capture = str(capture)
str_addr = str(Downloader.DrSTONE_ADDR + capture)
if capture < 10: str_capture = '0' + str_capture
return Downloader.BASE_URL.format(cap = str_capture, addr = str_addr)

async def download_one_picture(self, url: str, filepath: str, filename: str) -> None:
'''
下载一张图片
url: 图片地址
filepath: 文件路径, 以"/"结尾
filenamer: 文件名
page: 页码号 用于显示进度
'''
try:
#print(url)
fileformat = url.split('.')[-1]
res = requests.get(url, headers=Downloader.MY_HEADERS)
with open(f'{filepath}{filename}.{fileformat}', 'wb') as f:
f.write(res.content)
print(f'{filepath}{filename}.{fileformat}完成...')
except Exception as e:
self.logger.log(f'{type(e)} | {str(e)} | 下载图片{filepath}{filename}时出错。')

def parse_one_capture(self, capture_number: int) -> list[tuple[str, str]]:
'''
解析第capture_number章的内容, 返回这一章所有图片的链接和序号
capture_number: 章序号
'''
try:
#url = self.form_capture_url(capture_number)
url = self.NEW__form_capture_url(capture_number)
print(url)
res = requests.get(url, headers=Downloader.MY_HEADERS)
soup = BeautifulSoup(res.text.replace('amp-img', 'amp_img'))
data_list = soup.find_all('div', {'data-id': True})
image_list = []
for each in data_list:
picture_url = each.amp_img['src']
picture_id = re.search('第([0-9]+)页', each.amp_img['alt']).group(1)
image_list.append((picture_url, picture_id))
return image_list
except Exception as e:
self.logger.log(f'{type(e)} | {str(e)} | 解析第{capture_number}章链接时出错,可能部分或全部图片未正确下载。')
return []

def run(self):
'''主方法'''

self.link_dict = self.NEW_get_random_capture_url()


#try:
print('开始下载...')
for capture in range(Downloader.CAPTURE_START, Downloader.CAPTURE_END + 1):
print(f'章进度:{capture - Downloader.CAPTURE_START + 1} / {Downloader.CAPTURE_END + 1 - Downloader.CAPTURE_START}')
path = f'{Downloader.PATH}capture{self.format_capture_number(capture)}/'
if not exists(path):
mkdir(path)
image_list = self.parse_one_capture(capture)
if not image_list: continue
tasks = []
for picture_url, picture_id in image_list:
tasks.append(asyncio.ensure_future(
self.download_one_picture(
picture_url,
path,
'page' + self.format_page_number(picture_id)
)
))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('下载结束...')
# except Exception as e:
# self.logger.log(f'{type(e)} | {str(e)} | run方法遇到未知错误')



#def NEW__get_random_capture_url() -> list[tuple[str, str]]:
def NEW_get_random_capture_url(self) -> dict[int, str]:
'''获取Mr.STONE所有章节的链接'''
print('获取章节目录开始...')
DOMIN = 'https://omyschool.com'
DrSTONE_url = r'https://omyschool.com/article_list/152/Dr.STONE%20%E7%9F%B3%E7%BA%AA%E5%85%83/'
res = requests.get(DrSTONE_url, headers=Downloader.MY_HEADERS)
soup = BeautifulSoup(res.text)
lst = soup.findAll('div', {"class": "chapter"})
#ret = []
ret_d = {}
for each in lst:
href = each.a["href"]
if 'Dr.STONE' in href:
id = re.search('([0-9]+)[话|話]', str(each)).group(1)
# ret.append((DOMIN + href, id))
# return sorted(ret, key = lambda x: int(x[1]))
ret_d[int(id)] = DOMIN + href
print('获取章节目录完成...')
return ret_d

def NEW__form_capture_url(self, capture: int) -> str:
'''
返回第capture章的网页地址
capture: 章序号
'''
return self.link_dict[capture]




if __name__ == '__main__':
d = Downloader()
d.run()

EXTRA

字比较小,一张一张图放大看太麻烦了,于是在本地弄成网页,直接滚鼠标就行,还能直接看大图。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os

PICTURE_PATH = 'out/'
HTML_PATH = 'html/'

def make_one_capture_to_html(folder_name: str) -> None:
pages = [each for each in os.listdir(PICTURE_PATH + folder_name) if each[ : 4] == 'page']
pages = sorted(pages)
html = '''
<!DOCTYPE html>
<html>
<head>
<title>temp</title>
<meta charset="utf-8">
<style>
div#MrSTONEpages{{
text-align: center;
}}
</style>

</head>
<body>
{images}
</body>
</html>
'''
img = ' <div id="MrSTONEpages"><img onload="if(this.width >= document.documentElement.clientWidth){{this.width = document.documentElement.clientWidth}}" align="middle" src="{page}"></img></div>\n'
imgs = ''
RELATIVE_PATH = '../' + PICTURE_PATH + folder_name + '/'
for each in pages:
imgs += img.format(page = RELATIVE_PATH + each)
with open(HTML_PATH + folder_name + '.html', 'w', encoding='utf-8') as f:
f.write(html.format(images = imgs))



if __name__ == '__main__':
capture_folders = [each for each in os.listdir(PICTURE_PATH) if each[ : 7] == 'capture']
for each in capture_folders:
make_one_capture_to_html(each)





nonebot2踩坑

关于适配器

ModuleNotFoundError: No module named ‘nonebot.adapters.onebot’

一开始看到报错二话没想猜测需要nb1,然后把nb1装上,然后烂完了,幸亏有群友帮忙。

image-20220702110628893

安装适配器解决

1
pip install nonebot-adapter-onebot

同时装有nb1和nb2

当时运行报错ModuleNotFoundError: No module named ‘nonebot.log’

两个都卸载,然后安装

1
2
3
4
5
pip uninstall nonebot
pip uninstall nonebot2
pip uninstall nb-cli
pip install nb-cli
pip install nonebot-adapter-onebot

cqhttp 配置连接

OneBot V11 ws地址改了

1
ws://127.0.0.1:port/onebot/v11/ws/

求生之路2zonemod药抗插件服务器搭建

前言

以前曾经写过一篇关于开服的博客,但是现在尝试开服的时候发现老博客的一些内容已经不管用了,而且老博客在转移的过程中,格式出了写问题,所以写一个新的。

普通L4D2服务器的搭建

主要还是搬sir大佬的文章咯

环境和必要组件

1
2
3
4
5
dpkg --add-architecture i386 # enable multi-arch
apt-get update && apt-get upgrade
apt-get install libc6:i386 # install base 32bit libraries
apt-get install lib32z1
apt-get install screen

创建一个名为steam的账户

这个用户名是可以随意指定的,但sir大佬的教程里用的steam,如果我们也用steam的话就不用手动改一些路径了(什么落伍的老方法,今非昔比了)

如果你不适用steam作为账户名,只需要修改SRCDS_USER变量名就好了(后面会说)。

1
2
3
adduser steam
adduser steam sudo
login

安装steamcmd和求生之路2游戏

这是sir大佬的教学文档里写的。主要做了: 安装steamcmd 匿名登录 更改安装路径 安装求生之路2。

1
2
3
4
5
6
7
8
(先进入steam账户的根目录)
wget http://media.steampowered.com/installer/steamcmd_linux.tar.gz
tar -xvzf steamcmd_linux.tar.gz
./steamcmd.sh
login anonymous
force_install_dir ./Steam/steamapps/common/l4d2
app_update 222860 validate
quit

(2022/06/11)但我用的时候会报打不开xxxsteam.so(忘记名字了)库的错误,错误原因未知,我猜测是steamcmd的版本过时了,所以找到了steamcmd的文档SteamCMD - Valve Developer Community (valvesoftware.com),发现可以直接apt安装(Ubuntu), 由于运行steamcmd,会自动创建在当前目录创建.steam目录,而steamcmd的工作目录在./.steam/steamcmd,所以路径想和sir保持一致偷懒的话,路径要修改。

所以流程变成了:

1
2
3
4
5
6
7
(先进入steam账户根目录)
apt install steamcmd
steamcmd
force_install_dir ../../Steam/steamapps/common/l4d2
login anonymous
app_update 222860 validate
quit

开启无插件的本地服务器

安装完求生之路2之后,就可以开求生服务器了,但现在还是官方版本。

运行/home/steam/Steam/steamapps/common/l4d2目录的 srcds_run脚本直接开服即可

ZoneMod药抗插件的安装

插件安装

把sir大佬的项目克隆到相应位置就安装好了。

SirPlease/L4D2-Competitive-Rework: Just refreshing and optimizing the core files a bit, eh? (github.com)

基本配置

./cfg/server.cfg

服务器名称,组号,设置为只有组内成员能搜索到。你可以把这个文件改名为server1.cfg(后面会说)。

1
2
3
hostname "蓝湖畔淅淅沥沥的雨" 
sv_steamgroup "41174273"
sv_steamgroup_exclusive "1"

./Dedicated Server Install Guide/srcds1

修改路径,以及配置IP端口、游戏信息等内容。

这个文件需要被放在/etc/init.d/目录下。

注意PARAMS变量中的server$SVNUM.cfg",这说明你的./cfg/路径下要有server1.cfg文件。当然你可以定义多种配置,写多个启动脚本。

1
2
3
4
5
6
7
SRCDS_USER="steam"

SVNUM=1
IP=0.0.0.0
PORT=27015
NAME=L4D2_Server$SVNUM
PARAMS="-game left4dead2 -ip $IP -port $PORT +sv_clockcorrection_msecs 25 -timeout 10 -tickrate 100 +map c2m1_highway -maxplayers 12 +servercfgfile server$SVNUM.cfg"

更加方便

编写shell脚本来调用脚本srcds~~(什么牛马套娃)~~

1
2
3
4
5
6
7
8
9
10
11
12
if [ $2 = "start" ]
then
/etc/init.d/srcds$1 "start"
elif [ $2 = "stop" ]
then
/etc/init.d/srcds$1 "stop"
elif [ $2 = "restart" ]
then
/etc/init.d/srcds$1 "restart"
else
echo "Unknow command. arg should be [start/stop/restart]"
fi

示例

1
2
3
./startgame.sh 2 restart
相当于
/etc/init.d/srcds2 restart

服务器配置

导航页

sir佬项目的根目录有这样四个txt文件: motd.txt, host.txt, myhost.txt, mymotd.txt

这些文件里可以写字,也可以放一个链接(图片,网页),可以自定义网页然后把链接放上。

其中motd.txt, host.txt是默认服的导航页

myhost.txt, mymotd.txt是加载插件后的导航页

服务器管理员

addons\sourcemod\configs\core.cfg文件中修改PassInfoVar变量的值,它声明了表示密码的变量的名称, 这个值可以随意更改,也可以不改用sir写的默认值。假设我们改成"_abcd"

1
"PassInfoVar" "_abcd"

addons\sourcemod\configs\admins_simple.ini文件中,拉到最下面加入以下内容。

1
2
"名称1" "99:z" "密码1"
"名称2" "99:z" "密码2"

这时候名称1和名称2已经被设为管理员了,如果想使用这两个名称登录,就需要在你的信息中设置对应的密码。

如"名称1"想进入服务器的话,需要先在控制台设置:

1
setinfo "_abcd" "密码1"

如"名称2"想进入服务器的话,需要先在控制台设置:

1
setinfo "_abcd" "密码2"

否则, 进不去,进去了再改成管理员名称也会被题出来。

游戏参数配置 (改zonemod插件)

限制连推次数

路径:./cfg/cfgogl/zonemod/shared_cvars.cfg

confogl_addcvar z_gun_swing_vs_min_penalty 10
confogl_addcvar z_gun_swing_vs_max_penalty 15

AI特感

路径:./cfg/cfgogl/zonemod/shared_cvars.cfg

阻碍ai特感刷新

confogl_addcvar confogl_blockinfectedbots “0”

导演系统允许ai特感刷新

confogl_addcvar director_allow_infected_bots “1”

AI特感貌似不受一些特感参数的限制

让AI生还者吃药加血

方法1

路径:./addons/sourcemod/plugins/optional/

删除(改名就行)插件botpopstop.smx

方法2

./cfg/cfgogl/zonemod/shared_cvars.cfg

将这一句注释

sm plugins load optional/botpopstop.smx

修改地图

路径:./cfg/stripper/zonemod/maps

想用原地图把地图配置文件从目录中移除

安全屋外止疼药最大数量

路径:cfg\cfgogl\zonemod\confogl.cfg

confogl_addcvar confogl_pills_limit 2

特感复活时间

路径:cfg\cfgogl\zonemod\confogl.cfg

confogl_addcvar z_ghost_delay_min 10
confogl_addcvar z_ghost_delay_max 10

插件安装

插件下载

AlliedModders (alliedmods.net)

SourceMod: Half-Life 2 Scripting

插件安装

以这个开局在安全屋生成近战的插件举例

[L4D2] Melee In The Safe Room (with unlock all melee weapons on every campaign) - AlliedModders (alliedmods.net)

在默认服安装

插件网页中说:copy the [L4D2] MeleeInTheSafeRoom.smx to your servers sourcemod/plugins folder. 也就是把smx文文件拷贝到addons/sourcemod/plugins目录下。 这样就可以在默认服使用该插件。

这个插件还有一些自定义选项,这里不做介绍,可以在网页看教程。

在zonemod服务器安装

cfg\cfgogl\zonemod路径下的shared_plugins.cfgconfogl_plugins.cfg文件中都有加载特定插件的语句,可以直接仿照。

先把smx文件拷贝到addons/sourcemod/plugins/optional目录,然后在confogl_plugins.cfg写入sm plugins load optional/[L4D2]MeleeInTheSafeRoom.smx

**注意:**经测试若插件名带空格会加载不上,可以给插件改一个名字

一些错误的解决

错误1

-bash: /etc/init.d/srcds1: /bin/sh^M: bad interpreter: No such file or directory

文件编码有问题。

sir的说明:

If you receive “-bash: /etc/init.d/srcds1: /bin/sh^M: bad interpreter: No such file or directory” error, it means you have dos line ending file You can use dos2unix command on srcds1 file, or use any other method to have this file in unix format

解决方法1:

vi打开文件, 输入:set ff=unix, 然后:wq退出

解决方法2:

sed -i -e ‘s/\r$//’ 文件名

其他

关于游戏更新

./steamcmd.sh
login anonymous
app_update 222860 validate

如果开了插件服务器,则需要等作者更新发布之后,将作者的更新同步到你的服务器

关于screen托盘

查看被托盘的进程,只能看到本账户开的,不能看到其他账户开的。

1
2
3
screen -ls
screen -r
当只有一个进程时, screen -r会直接进入进程, screen -ls 不会

打开进程

1
2
screen -r 进程名称或进程号(pid)
当只有一个进程被托盘时, 输入screen -r直接打开该进程

退出进程

1
2
Ctrl + C 直接杀掉进程
Ctrl + A + D 退出进程页面, 进程被托盘

关于[socket.io实现websocket]部分功能的使用

JavaScript客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 连接请求
socket = io.connect('ws://' + document.domain + ':' + location.port + '/wsmine', {
reconnectionDelayMax: 10000,
//附加在请求里的内容
query: {
"cookie": this_cookie
}
});
//连接时触发
socket.on("connect", (rev) => {
alert("地图生成时间可能较长,请稍等!\n\n扫雷时请不要快速点击,服务器承受不住!");
})
//断开时触发
socket.on("connect", (rev) => {})

//收到特定消息
socket.on("your massage name", (rev) => {})

python flask_socketio 服务端

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 文件1
from flask_socketio import SocketIO
clearmind_socketio = SocketIO()

# 文件2
from flask import Blueprint
clearmind_blueprint = Blueprint('main', __name__)

# 启动脚本
from flask import Flask
from BackEnd.objects import clearmind_socketio
from BackEnd.routes import clearmind_blueprint
from flask_cors import CORS

def create_app(debug = True):
app = Flask(__name__)
app.debug = debug
app.config['SECRET_KEY'] = 'clearmind'
app.register_blueprint(clearmind_blueprint)
clearmind_socketio.init_app(app)
CORS(app, supports_credentials=True)
from BackEnd import events
return app


if __name__ == '__main__':
app = create_app()
clearmind_socketio.run(app, host='0.0.0.0', port=26666)

消息规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import threading
from flask import session, request
from flask_socketio import emit, join_room, leave_room, disconnect

# 命名空间wslogin 连接时响应
@clearmind_socketio.on('connect', namespace='/wslogin')
def login_connect():
print_and_log('收到登录请求...')
try:
username = request.args['username']
password = request.args['password']
'''......'''
if 4 < 5:
# 发送reply消息
emit('reply', reply)
else:
# 发送reply消息
emit('reply', 'deny')
except Exception as e:
print_and_log(str(type(e)),str(e))
disconnect()
return False

# 命名空间wslogin 连接断开时响应
@clearmind_socketio.on('disconnect', namespace='/wslogin')
def login_disconnect():
'''登录连接断开时执行'''
try :
username = request.args['username']
password = request.args['password']
except:
pass
print_and_log(f'登录连接断开... {username}')



# 命名空间wsmine 收到click信息时响应
@clearmind_socketio.on('click', namespace='/wsmine')
def mine_click(info):
'''收到点击地图时间, 把数据中的参数抛给后端处理'''

try :
cookie = request.args['cookie']
data = json.loads(info)
x, y = data['x'], data['y']
username, tm = cookie_user_dict[cookie]
except Exception as e:
print_and_log('>>> error ' + str(type(e)) + ' ' + str(e))
disconnect()
return False

'''代码'''

# 广播broadcast消息
emit('broadcast', json.dumps({'x' : x, 'y' : y, 'color' : color, 'timmer' : timmer, 'username' : username}), broadcast = True)

'''代码'''

# 广播game end消息
emit('game end', json.dumps(CM_server.rank()), broadcast = True)

'''代码'''

# 广播args 消息
emit('args', json.dumps(CM_server.args()), broadcast = True)

关于socket部分功能的使用

前言

都是复制的以前写过的东西

python UDP 客户端+服务端

封装进了一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import socket
import threading
from typing import Tuple

class SR:
def __init__(self, ip = '0.0.0.0', port = 27013, sz = 2048) -> None:
self.__socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__socket.bind((ip, port))
self.__sz = sz
self.__lock = threading.Lock()
self.__datapool = []


def __t_receive(self) -> None:
'''接收消息, 应该放到单独的线程中'''
while True:
try:
data, address = self.__socket.recvfrom(self.__sz)
self.__lock.acquire()
data = data.decode('UTF-8')
self.__datapool.append((data, address))
self.__lock.release()
except:
break

def receive(self) -> None:
'''开启消息接收的方法'''
t = threading.Thread(target = self.__t_receive, name = 'receiver')
t.start()

def send(self, data, address) -> None:
'''发送消息'''
self.__socket.sendto(data.encode('UTF-8'), address)

def getData(self) -> Tuple[str, Tuple[str, int]]:
'''
从消息队列中提取一条最早的数据
return : (msg, (ip, port))
'''
#self.__lock.acquire()
if len(self.__datapool) == 0: return None
data = self.__datapool.pop(0)
#self.__lock.release()
return data

def inactive(self):
self.__socket.close()

python TCP 服务端

一个摆烂的服务端脚本删减

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# from MemoSqliteFunctions import *
import socket
import threading

HOST = '0.0.0.0'
PORT = 22222
BUFF_SIZE = 4096

def service_run():
# 建立Socket连接, AF_INEF说明使用IPv4地址, SOCK_STREAM指明TCP协议
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSocket.bind((HOST, PORT))
serverSocket.listen(3)# 监听

print(f'Run at {HOST}:{PORT}')

while True:
# 接收TCP连接, 并返回新的Socket对象
sk, addr = serverSocket.accept()
print(f"客户端: {addr} 链接")

task = threading.Thread(target = TCP_task, args=(sk, ), name = 'tcp_task')
task.start()

else:
conn.close()


def TCP_task(sk : socket.socket) -> None:
'''线程任务'''

# ......

while True:
try:
# 接收客户端发送的数据
data = sk.recv(BUFF_SIZE)
data = data.decode('utf-8')
# ......
except:
# ......
pass


if __name__ == '__main__':
service_run()

java(Android开发) UDP 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void uploadUDP(String ip, String port){
new Thread() {
//线程运行的代码
public void run() {
try {
// Toast弹窗必要条件1
Looper.prepare();

InetAddress address = InetAddress.getByName(ip);
DatagramSocket socket = new DatagramSocket();

String data = "xxxxxxx";

// 发消息
byte[] data1 = data.getBytes();
DatagramPacket packet = new DatagramPacket(data1, data1.length, address, Integer.parseInt(port));
socket.send(packet);

//收消息
byte[] data2 = new byte[1 << 5];
DatagramPacket packet2 = new DatagramPacket(data2, data2.length);
socket.receive(packet2);

String reply = new String(data2, 0, packet2.getLength());

Toast.makeText(CloudActivity.this, "receive : "+reply, Toast.LENGTH_SHORT).show();

// Toast弹窗必要条件1
Looper.loop();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();//启动线程
}

java(Android开发) TCP 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private void upload(String ip, String port){
new Thread() {
//线程运行的代码
public void run() {
boolean know_error = false;

// Toast弹窗必要条件1
Looper.prepare();
try {
// ......

InetAddress serverip= InetAddress.getByName(ip);;//定义保存服务器地址的对象
Socket client=new Socket(serverip, Integer.parseInt(port));//定义创建客户端对象的Socket

OutputStream socketOut=client.getOutputStream(); //定义发送信息的输出流对象
InputStream socketIn=client.getInputStream(); //定义接收数据的输入流

byte receive[] = new byte[buff_size]; //定义保存客户端发送来的数据的字节数组

// ......

//发消息
String firstData = "xxxxxxx";
socketOut.write(firstData.getBytes("utf-8"));

//接收数据保存在字节数组中, 然后转String字符串
int len=socketIn.read(receive);
String rev=new String(receive,0,len);

if (! rev.equals("ok")){
know_error = true;
Toast.makeText(CloudActivity.this, "拒绝访问", Toast.LENGTH_SHORT).show();
throw new Exception("access deny");
}

for(int row=0; row<rowSize ; row++) {

String data = Integer.toString(row);

// 发消息
socketOut.write(data.getBytes("utf-8"));

//接收数据保存在字节数组中, 然后转String字符串
len=socketIn.read(receive);
rev=new String(receive,0,len);

if (! rev.equals("ok")){
know_error = true;
Toast.makeText(CloudActivity.this, "传输中遇到错误", Toast.LENGTH_SHORT).show();
throw new Exception("error at half road");
}


}
socketOut.close();
socketIn.close();
client.close();

Toast.makeText(CloudActivity.this, "同步完成", Toast.LENGTH_SHORT).show();
}catch(ConnectException e){
Toast.makeText(CloudActivity.this, "未能成功连接服务器", Toast.LENGTH_SHORT).show();
}
catch(Exception e){
Log.i("tcp-error", e.getMessage() + e.getStackTrace() + e.getClass());
if(! know_error){
Toast.makeText(CloudActivity.this, "发生错误", Toast.LENGTH_SHORT).show();
}
}

// Toast弹窗必要条件2
Looper.loop();
}
}.start();//启动线程
}

Git使用总结(仅部分功能)

部分全局配置

修改全局配置

1
git config --global 配置名称 配置新值

部分全局配置

用户名

1
git config user.name

邮箱

1
git config user.email

HTTP和HTTPS代理

1
2
git config http.proxy
git config https.proxy

新建一个库时Github给出的初始化代码

1
2
3
4
5
6
7
echo "# New Repo" >> README.md
git init
git add README.md
git commit -m "first commit"
git branch -M main
git remote add origin https://github.com/Wldcmzy/234.git
git push -u origin main

解析

初始化

1
git init

建立READM.md 文件

1
echo "# New Repo" >> README.md

添加README.md 至工作缓存

1
git add README.md

将缓存提交至本地库

1
git commit -m "first commit"

更改当前分支名字为main

1
git branch -M main

设置远程连接的仓库

1
git remote add origin https://github.com/Wldcmzy/234.git

将本地仓库提交至远程仓库(将本地origin提交至远程main分支, 写-u设默认,以后只需要git push)

1
git push -u origin main

可以不修改分支名字

1
2
3
4
5
git init
git add README.md
git commit -m "first commit"
git remote add origin https://github.com/Wldcmzy/234.git
git push -u origin master

从远程获取最新数据

1
git pull
1
git pull origin master

其他分支的切换和提交

切换分支

1
git checkout 分支名称

新建分支

1
git checkout -b 分支名称

新建空的分支

使用git checkout -b命令创建的分支是有父节点。

使用git checkout --orphan命令,创建孤立分支

1
git checkout --orphan

使用git rm -rf .来清除拷贝的内容

1
git rm -rf .

现在获得了一个空的文件夹,但是现在分支没完全有,使用git branch -a也是看不到新分支的。

再随便加一点东西(比如README.md),提交上去。

1
2
git add .
git commit -m 'new branch'

然后分支就有了,可以把它提交到远程仓库。

1
git push origin 新分支

对多人扫雷课设核心算法的分析

序言

一开始打算自己写的,但想起来无敌的xxy早在报告中介绍过这一部分了,便直接拿过来用乐。

项目连接

Wldcmzy/Multiplayer-MineSweeper: 网页版多人扫雷 web前端+python flask后端+websocket通讯 JS/CSS代码内置于HTML (github.com)

一些定义

复制时序号错乱,不以序号为准。

(7)开荒:用户点击一个未知状态的方格后,根据开荒规则打开一片区域。开荒有2个阶段:点击开荒、遍历开荒,分别对应两种开荒规则,规则请参照定义第8、9条。

(8)点击开荒规则:点击开荒阶段的规则。先打开用户点击位置的方格。如果用户点击位置方格打开状态不是雷,则打开八方位方格中打开状态为0的方格。

(9)遍历开荒规则:遍历开荒阶段的规则。对打开的八方位方格中的方格继续进行开荒。如果方格打开状态是0,打开八方位的全部方格,然后重复执行该遍历开荒规则;如果方格打开状态是1、2、3、4、5、6、7、8,停止当前方格的遍历开荒。

(1) 时间戳:用户点击请求到达服务器的相对时间顺序。

(2) 用户颜色:每个用户在每一局游戏中会获得一个颜色,雷区地图上用用户颜色标识用户开荒区域。同一用户在同一局游戏中始终具有相同的用户颜色,用户的退出不能更改本局的用户颜色。游戏重开后,用户颜色重置。

(3) 染色:将用户开荒区域的方格由初始颜色变更为用户颜色。

(4) 染色修正:用户颜色的修正。在多人进行游戏时,因为数据传输的延迟,所以前端收到的广播顺序不一定是操作到达服务器的顺序。所以有可能出现用户A的点击请求先到达服务端,由于服务端还未广播A的点击或者前端收到了广播还没有进行开荒,用户B可能点击了和用户A开荒的区域有交集的位置,用户B的点击请求也会到达服务端。服务器会先广播A的点击,后广播B的点击。用户B的广播可能先到前端,这样前端就会将区域判定为用户B的区域导致错误。所以增加染色修正的概念。前端会根据染色修正规则将开荒区域调整为正确的颜色。染色修正规则请参照定义第18条。

(5) 染色修正规则:以时间戳为依据,方格的颜色为时间戳最小的用户对应的用户颜色。

算法分析

1、雷区生成算法

雷区生成算法的参数有:子区域行列数、子区域雷数、雷区地图行数、雷区地图列数、模数、雷区种子、斐波那契数列值。

设置子区域的目的是为了防止部分区域雷的密度不够,一次开荒出大片区域,递归栈空间不够用。同时也是为了更好的用户体验。将雷区地图按照子区域行列数进行划分,依次进行每个子区域的地图生成。划分边界的不够完整的子区域的雷区,按照完整子区域的雷的密度计算得出该区域的雷数。

斐波那契数列值用于辅助进行子区域布雷,每局雷区地图生成初始为数列第1项。

子区域地图生成:

(1)子区域按顺序从上到下、从左到右对每个方格依次进行编号,编号从0开始,每次递增1。最终编号小于子区域雷数的方格为有雷的方格。

(2)定义辅助变量delta,初始化令其等于雷区种子

(3)子区域按顺序从上到下、从左到右枚举,对枚举到的每个格子进行编号的交换。交换规则:先递推一次斐波那契数列,辅助变量delta和斐波那契数列值相乘,并对模数取模。若取模结果大于子区域雷数,持续和斐波那契数列值相乘,直至取模结果小于子区域雷数。delta%行数为交换位置的子区域行号。再将delta除以子区域行数,delta%列数为交换位置的子区域列号。交换枚举位置和交换位置的两个方格的数字。

(4)将子区域地图信息转移到雷区地图上。

2、开荒算法

根据定义中的点击开荒规则和遍历开荒规则进行dfs遍历雷区地图。具体有四种情况:

(1)前端同步本局历史数据时:由于接收到的历史数据是按按照时间戳排好序的,所以直接按照规则进行dfs遍历,同时在遍历过程中进行染色。

(2)前端收到广播时:由于收到的广播顺序不一定是按时间戳排好序的,所以按照规则进行dfs遍历,同时在遍历过程中进行染色,如果在dfs遍历中发生要对已染色的区域再次染色,则参考染色修正规则。

(3)前端进行用户本地开荒时:只按照规则进行dfs遍历,不染色。

(4)后端遍历:因为时间戳表示到达后端的时间,所以后端直接判断初始位置是否被遍历过,若没有,进行遍历即可。

3、用户颜色分配算法

颜色库中规定多种色调的颜色,每种色调有若干亮度不同的颜色。

(1)确定色调:随机一个颜色范围不大于色调的随机数,若该色调使用次数为色调最少使用次数,则确定为该色调,否则向后循环遍历色调,直到找到使用次数为色调最少使用次数的色调。

(2)确定颜色:随机一个不大于该色调颜色数的随机数,若该颜色使用次数为色调内颜色最少使用次数,则确定为该颜色,否则向后循环遍历色调内的颜色,直到找到使用次数为色调内颜色最少使用次数的色调。

4、染色修正规则

在染色的时候,永远遵循染时间戳更小的用户颜色。即在发生要对已染色的区域再次染色时:

(1)如果原方格的时间戳更小,则颜色不更改。

(2)如果新的时间戳更小,则更改方格的颜色和时间戳。

python语言实现服务器对mysql数据库的自动处理

声明

1.本人还是比较菜的,对服务器安全性等内容可能一窍不通,也没有使用框架,仅考虑完成目的。

2.本篇博客仅记录使用本人的总结,仅论述实现目的的基本流程, 不详细讲解知识,数据库初学者建议先学习sql基本语法(时间成本不高)

环境安装

我使用的操作系统是Ubuntu20, 装好之后是自带python3, 所以我们直接无脑安装mysql

1
2
apt update
apt install mysql-server

非root账号输入, 当然要能获取root权限才行

1
2
sudo apt update
sudo apt install mysql-server

新建一个账号

这一步没什么用,只是我想新建一个

1
adduser --home /usr/sir sir

mysql登录

mysql数据库创建时会自动生成一串root用户密码,想改密码的可以去查查怎么改。使用root账号可以直接这样登录

1
mysql

非root账户用用户名密码登录, 输入下方语句后会提示你输入密码

1
mysql -u username -p

数据库基本操作

建增删改查等操作示例。

1
2
3
4
select ifUsed from invitation where invitationCode = 'xxx';
update invitation set ifUsed = 'xxx' where invitationCode = 'xxx';
delete from invitation where username = 'xxx';
insert into userInfo values ('xxx', 'xxx', 'xxx', 0, 0, 0);

创建一个数据库用户, @后跟一个ip, 表示允许哪些ip,或哪些子网段的ip访问, 若写%表示所有, localhost顾名思义是本地

identified by 后面写密码

1
create user 'testdbuser'@'localhost' identified by 'abcd';

给用户授权, 当然也可以新建角色给角色授权,然后再把角色权限授权给用户,

1
grant all privileges on table 好几个表 to testdbuser@localhost

python自动操作数据库

安装pymysql库

无脑安装

1
pip install mypysql

pymysql基本操作

1
import pymysql
第一步

首先创建对象游标, 形如下方代码

参数database表示你要操作哪个数据库 ,用户名密码要正确,且用户要有访问权限才行

1
2
3
self.__connection = pymysql.connect(host = self.__host, user = self.__user, password = self.__password, database = self.__database) # 实例化对象

self.__cursor = self.__connection.cursor(cursor=pymysql.cursors.DictCursor) #游标
增删改查

注意除了查询操作外,其他操作(也就是会改变数据库内容的操作)需要最后使用**commit()**方法,否则无法成功真正在数据库中完成操作

1
2
3
4
sql = 'select count(*) cnt from User where uid = %s'
self.__cursor.execute(sql, (uid)) # 使用execute执行
cnt = self.__cursor.fetchone()['cnt'] # 使用fetchone() fetchmany() fetchall() 获取返回结果的一条、多条或全部
if cnt == 0: return '无该用户'

1
2
3
sql = 'insert into User values (%s, %s, %s, %s)'
self.__cursor.execute(sql, (cnt + 1, name, password, False))
self.__connection.commit() #操作设计数据库修改需要执行commit方法

其他的都差不多

根据使用经验,若传参为字符串自带引号,当然可以提前写好之后只execute(sql)

1
2
sql = 'insert into User values (%s, %s, %s, %s)' %(乱七八糟)
self.__cursor.execute(sql)
长时间未操作连接断开

Connection对象的ping 方法可以检查是否连接还在,将参数reconnect设为True表示若连接断开自动重连

1
self.__connection.ping(reconnect=True)