Source code for scpython.client

import scpython.model.scparticle
from scpython.options import Branch, Language
from scpython.util.enum import getScpBranch, getItemByName
from scpython.exceptions import *

from threading import Thread
import json
import html
import urllib3
import re


[docs]class Client: """A connection to interact with Wikidot. **Arguments** wikidot_token7 :class:`str` — The user's identity. """ __AJAX_CONNECTOR = "/ajax-module-connector.php" def __init__(self, wikidot_token7): self.__token = wikidot_token7 self.__pool = urllib3.PoolManager()
[docs] def getScpArticle(self, code, **kwargs): """Gets the requested SCP. **Arguments** code :class:`str` — The SCP's full code (``"SCP-XXXX-BRANCH"``). branch :class:`Branch` — In case the Client cannot determine which branch the SCP was originally written in, you can specify it manually. language :class:`Language` — The language to return the SCP in. **Returns** article :class:`Article` — The corresponding article. **Throws** :class:`InvalidToken` — The wikidot token7 you have provided is invalid. :class:`ScpArticleNotFound` — The SCP article you searched does not exist. :class:`ScpArticleNotTranslated` — The SCP article you searched exists, but it's not translated in the language you're looking for. """ code = code.upper() branch = kwargs["branch"] if "branch" in kwargs and isinstance(kwargs["branch"], Branch) \ else getScpBranch(code) if "language" not in kwargs or not isinstance(kwargs["language"], Language): language = getItemByName(Language, branch.name) if branch is Branch.INT: language = Language.EN else: language = kwargs["language"] wiki_url = Branch.INT.value if branch is not Branch.EN and language is Language.EN \ else getItemByName(Branch, language.name).value url = wiki_url + code response = self.__pool.request( "GET", url, headers={"Cookie": f"wikidot_token7={self.__token}"} ) if response.status == 200: raw_html = response.data.decode("utf-8") regex = { "id": r"WIKIREQUEST.info.pageId = (\d+);", "page_version": r"<div id=\"page-info\">\D+?: (\d+?), \D+?: .+?<\/div>", "last_change": r"<div id=\"page-info\">.+?<span class=\".+?\">(.+?)<\/span><\/div>", "rating": r"<span class=\"rate-points\">.+?<span class=\".+?\">((?:\+|-)\d+?)<\/span><\/span>", } scp_data = { "url": url, } for var in regex: match = re.search(regex[var], raw_html) result = match.group(1) if match else None scp_data[var] = result raw_tags = re.search(r"<div class=\"page-tags\">\s+<span>\s+(?:<a href=\".+?\">.+?</a>)+", raw_html) \ .group(0) tags = re.findall(r"<a href=\".+?\">(.+?)</a>", raw_tags) scp_data = { **scp_data, "tags": tags } if scp_data["page_version"]: req_poster = Client.__RequestThread(self.__pool) req_poster.set_request( "POST", wiki_url + Client.__AJAX_CONNECTOR, headers={"Cookie": f"wikidot_token7={self.__token}"}, fields={ "page_id": scp_data["id"], "moduleName": "history/PageRevisionListModule", "wikidot_token7": self.__token, "options": "{\"all\":true}", "page": int(scp_data["page_version"])+1, "perpage": "1", } ) req_poster.start() req_page_source = Client.__RequestThread(self.__pool) req_page_source.set_request( "POST", wiki_url + Client.__AJAX_CONNECTOR, headers={"Cookie": f"wikidot_token7={self.__token}"}, fields={ "page_id": scp_data["id"], "moduleName": "viewsource/ViewSourceModule", "wikidot_token7": self.__token, } ) req_page_source.start() author = None if scp_data["page_version"]: req_poster.join() res_poster = req_poster.getResponse() poster_data = json.loads(res_poster.decode("utf-8")) if poster_data["status"] == "wrong_token7": raise InvalidToken() author = None if not res_poster \ else re.search( r"<span class=\"printuser .+?\">(?:<a .+?>)?<img .+/>(?:</a>)?(?:<a .+?>)?(.+?)(?:</a>)*</span>", poster_data["body"] ).group(1) req_page_source.join() res_page_source = req_page_source.getResponse() page_source = "" if not res_page_source \ else json.loads(res_page_source.decode("utf-8"))["body"] page_source = html.unescape( re.match(r"<h1>.+?</h1>\s+<div class=\"page-source\">([\w\W]+)</div>", page_source) .group(1) .replace("<br />", "") ) scp_data = { **scp_data, "branch": branch, "full_code": code, "page_source": page_source, "poster": author, "language": language } return scpython.model.scparticle.ScpArticle(scp_data) elif response.status == 404: if language == branch: raise ScpArticleNotFound(code) original_branch = getScpBranch(code) if original_branch is not branch: raise ScpArticleNotFound(code, branch=branch) exists_at_all = self.__pool.request( "GET", branch.value + code, headers={"Cookie": f"wikidot_token7={self.__token}"} ).status == 200 if exists_at_all: raise ScpArticleNotTranslated(code, language) else: raise ScpArticleNotFound(code) else: raise Exception("Invalid response status")
class __RequestThread(Thread): """A Thread to make requests, to speed up things. **Arguments** pool — The pool to make requests with. """ def __init__(self, pool): super().__init__() self.__pool = pool self.__method = None self.__url = None self.__request_kwargs = {} self.__response = None def set_request(self, method, url, **kwargs): """Sets the request parameters. **Arguments** method :class:`str` — The HTTP Method. url :class:`str` — The URL to send the request to. request_kwargs :class:`dict` — The optional parameters to send to the request. """ self.__method = method self.__url = url self.__request_kwargs = kwargs if kwargs else {} def run(self): if self.__method and self.__url: response = self.__pool.request(self.__method, self.__url, **self.__request_kwargs) if response.status == 200: self.__response = response.data return self.__response = None def getResponse(self): """Returns the last call's response. **Returns** response :class:`bytes` — The request response. """ return self.__response