获取中国天气网的 地区-代号 映射表

万年历api无法建立连接了,不知道为什么(XvX),于是想换一个api给机器人查天气

注意

程序编写于2022/09/28,注意代码的时效性

代码

最终格式为dict[地区名, 地区代码], 用pickle库二进制存入文件stations.silksINFn

请求天气的链接形如:http://t.weather.sojson.com/api/weather/city/101010100

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import aiohttp
import asyncio
import json
import pickle

SEMAPHORE = 8

async def get_provinces_code() -> dict[str, str]:
URL = 'http://www.weather.com.cn/data/city3jdata/china.html'
async with semaphore:
async with session.get(URL) as res:
return json.loads(await res.text())

async def get_cities_code(province_code: str) -> tuple[str, dict[str, str]]:
URL = 'http://www.weather.com.cn/data/city3jdata/provshi/{}.html'.format(province_code)
async with semaphore:
async with session.get(URL) as res:
return province_code, json.loads(await res.text())

async def get_stations_code(city_code: str) -> tuple[str, dict[str, str]]:
URL = 'http://www.weather.com.cn/data/city3jdata/station/{}.html'.format(city_code)
async with semaphore:
async with session.get(URL) as res:
return city_code, json.loads(await res.text())

async def query():
global session, semaphore
session = aiohttp.ClientSession()
semaphore = asyncio.Semaphore(SEMAPHORE)

provinces_dict: dict[str, str] = await get_provinces_code()

tasks: list[asyncio.Task[tuple[str, dict[str, str]]]] = []
for code, province in provinces_dict.items():
tasks.append(asyncio.ensure_future(get_cities_code(code)))
await asyncio.wait(tasks)

lst: list[str] = []
for task in tasks:
province_code, cities_dict = task.result()
for code, city in cities_dict.items():
lst.append(province_code + code)

tasks = [asyncio.ensure_future(get_stations_code(each)) for each in lst]
await asyncio.wait(tasks)

final_dict: dict[str, str] = {}
for task in tasks:
city_code, stations_dict = task.result()
for code, station in stations_dict.items():
final_dict[station] = city_code + code

with open('stations.silksINFn', 'wb') as f:
pickle.dump(final_dict, f)

def main():
global loop

loop = asyncio.get_event_loop()
loop.run_until_complete(query())

session.close()

if __name__ == '__main__':
main()


# http://t.weather.sojson.com/api/weather/city/101010100