import os
import copy
import requests
import subprocess
from typing import Tuple, Optional, List
from .models import QualityOptions, StreamCallback
from .streamer import BasicStreamer
from . import utils
[docs]class BilibiliAPI:
def __init__(self, cookie_filename: Optional[str] = None) -> None:
if cookie_filename is not None:
import http.cookiejar as cookiejar
self.cookies = cookiejar.MozillaCookieJar()
self.cookies.load(cookie_filename, ignore_discard=True, ignore_expires=True)
else:
self.cookies = None
def _request(self, url: str, **kwargs) -> requests.Response:
if self.cookies is not None:
kwargs["cookies"] = self.cookies
return requests.get(url, **kwargs)
def _interface_request(self, path, **kwargs) -> dict:
return self._request("https://api.bilibili.com" + path, **kwargs).json()
[docs] def get_favorites(self, favorite_id: int, page: int = 1) -> list:
path = "/x/v3/fav/resource/list?media_id={}&pn={}&ps=20&order=mtime".format(favorite_id, page)
favourites = self._interface_request(path)["data"]["medias"]
return favourites or []
[docs] def get_favorites_all(self, favorite_id: int) -> List[dict]:
page = 1
favorites = []
while True:
page_favorites = self.get_favorites(favorite_id, page)
if len(page_favorites) == 0:
break
favorites += page_favorites
page += 1
return favorites
[docs] def get_favorites_since(self, favorite_id: int, from_timestamp: int) -> List[dict]:
page = 1
favorites = []
while True:
page_favorites = self.get_favorites(favorite_id, page)
if len(page_favorites) == 0:
break
for fav in page_favorites:
if fav["fav_time"] < from_timestamp:
break
favorites.append(fav)
page += 1
return favorites
[docs] def list_user_favourite_folders(self, uid: int) -> List[dict]:
path = "/x/v3/fav/folder/created/list-all?up_mid={}".format(uid)
folders = self._interface_request(path)["data"]["list"]
return folders or []
[docs] def get_video(self, aid: int) -> dict:
path = "/x/web-interface/view?aid={}".format(aid)
return self._interface_request(path)["data"]
[docs] def get_live_danmaku(self, cid: int) -> bytes:
url = "https://comment.bilibili.com/{}.xml".format(cid)
return self._request(url).content
[docs] def get_archive(self, category_id: int, tag_id: Optional[int] = None, page: int = 1) -> Tuple[int, List[dict]]:
if tag_id is not None:
path = "/x/tag/ranking/archives?ps=20&pn={}&rid={}&tag_id={}".format(page, category_id, tag_id)
else:
path = "/x/web-interface/newlist?ps=20&pn={}&rid={}".format(page, category_id)
response = self._interface_request(path)
videos = response["data"]["archives"]
count = response["data"]["page"]["count"]
return count, videos
[docs] def list_stickers(self) -> List[dict]:
path = "/x/emote/setting/panel?business=reply"
packages = self._interface_request(path)["data"]["all_packages"]
return packages or []
[docs] def get_sticker(self, sticker_id: int) -> dict:
url = "/x/emote/package?business=reply&ids={}".format(sticker_id)
sticker = self._interface_request(url)["data"]["packages"]
return sticker or sticker
[docs] def get_cover_picture(self, aid) -> Tuple[str, bytes]:
url = self.get_video(aid)["pic"]
basename = os.path.basename(url)
stream = requests.get(url).content
return basename, stream
[docs] def get_stream_url(self, aid: int, cid: int, quality_options: QualityOptions) -> dict:
path = f"/x/player/playurl?avid={aid}&cid={cid}&fnver=0&fnval={quality_options.quality_flag}&fourk=1"
stream_urls = self._interface_request(path)["data"]["dash"]
audio = stream_urls["audio"][0]
video = stream_urls["video"][0]
result_quality = copy.deepcopy(quality_options)
if quality_options.h265:
h265_streams = list(filter(lambda stream: stream["codecid"] == 12, stream_urls["video"]))
if len(h265_streams) > 0:
video = h265_streams[0]
result_quality.h265 = True
result_quality.dolby_vision = (video["id"] == 126)
result_quality.dolby_audio = (stream_urls.get("dolby", {}).get("audio") is not None)
result_quality.flac_audio = (stream_urls.get("flac", None) is not None)
if result_quality.dolby_audio:
audio = stream_urls["dolby"]["audio"][0]
if result_quality.flac_audio:
audio = stream_urls["flac"]["audio"]
return {"audio": audio["base_url"], "video": video["base_url"], "quality": result_quality}
[docs] def get_stream(self, url: str, tag: str = "", chunk_size: int = 8192, callback: StreamCallback = None) -> bytes:
headers = {
"User-Agent": "Bilibili Freedoooooom/MarkII",
"Referer": "https://www.bilibili.com/",
"Accept": "*/*",
"Icy-MetaData": "1"
}
kwargs = {"tag": tag, "chunk_size": chunk_size, "callback": callback, "cookies": self.cookies}
streamer = BasicStreamer(url, headers, **kwargs)
return streamer.start()
[docs] def save_stream(self, aid: int, cid: int, quality_options: QualityOptions, dest_file: str,
audio_only: bool = False, host: Optional[str] = None,
chunk_size: int = 8192, callback: StreamCallback = None) -> subprocess.CompletedProcess:
from . import codec
stream_url = self.get_stream_url(aid, cid, quality_options)
if host is not None:
stream_url["audio"] = utils.replace_host(stream_url["audio"], host)
stream_url["video"] = utils.replace_host(stream_url["video"], host)
audio_stream = self.get_stream(stream_url["audio"], "audio", chunk_size, callback)
if not audio_only:
video_stream = self.get_stream(stream_url["video"], "video", chunk_size, callback)
return codec.merge(audio_stream, video_stream, dest_file)
quality: QualityOptions = stream_url["quality"]
return codec.extract_audio(audio_stream, dest_file, quality)