mirror of
https://github.com/ClaytonWWilson/Scraper-for-theTVDB.com.git
synced 2025-12-15 17:28:46 +00:00
Refactoring - Moving code around to make it more intuitive
This commit is contained in:
parent
de2bc5c576
commit
754df95b70
@ -4,32 +4,6 @@ import datetime
|
||||
|
||||
import dateutil.parser
|
||||
|
||||
from actions import refreshToken
|
||||
from checks import checkTimestamp
|
||||
from checks import getToken
|
||||
|
||||
class APIConnector:
|
||||
def __init__(self):
|
||||
with open("login.json", "r") as f:
|
||||
self.login = json.loads(f)
|
||||
self.auth_headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Authorization": "Bearer " + login["TOKEN"]
|
||||
}
|
||||
|
||||
def reload_login(self):
|
||||
with open("login.json", "r") as f:
|
||||
self.login = json.loads(f)
|
||||
self.auth_headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Authorization": "Bearer " + login["TOKEN"]
|
||||
}
|
||||
|
||||
def send_http_req(api_path):
|
||||
return requests.get(api_path, headers=self.auth_headers)
|
||||
|
||||
|
||||
def login():
|
||||
if os.path.exists("login.json") == False:
|
||||
@ -87,3 +61,41 @@ def login():
|
||||
print("\nLogin successful!\n")
|
||||
|
||||
# TODO at startup, check token for validity and remove it if it is expired
|
||||
|
||||
|
||||
def getToken(data):#TODO add a timeout and try catch to all requests
|
||||
url = "https://api.thetvdb.com/login"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
try:
|
||||
response = requests.post(url, data=json.dumps(data), headers=headers)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print("An error occurred. Please check your internet and try again.")
|
||||
quit()
|
||||
|
||||
if (checkStatus(response, False)):
|
||||
parsed_token = json.loads(response.content)
|
||||
token = parsed_token["token"]
|
||||
return token
|
||||
else:
|
||||
return ""
|
||||
|
||||
def checkStatus(response, v):
|
||||
if (response.status_code != 200):
|
||||
if (v == True):
|
||||
print("\nAn error occurred.")
|
||||
print("HTTP Code: {}".format(str(response.status_code)))
|
||||
# error = json.loads(response.content) # TODO move this somewhere else
|
||||
# print("Response : " + error["Error"])
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
# Returns true if the token is still valid
|
||||
def checkTimestamp(save_time, cur_time):
|
||||
if cur_time - save_time < datetime.timedelta(0, 86100, 0):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
41
checks.py
41
checks.py
@ -1,41 +0,0 @@
|
||||
import json
|
||||
|
||||
import requests
|
||||
import datetime
|
||||
|
||||
def getToken(data):#TODO add a timeout and try catch to all requests
|
||||
url = "https://api.thetvdb.com/login"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
try:
|
||||
response = requests.post(url, data=json.dumps(data), headers=headers)
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
print("An error occurred. Please check your internet and try again.")
|
||||
quit()
|
||||
|
||||
if (checkStatus(response, False)):
|
||||
parsed_token = json.loads(response.content)
|
||||
token = parsed_token["token"]
|
||||
return token
|
||||
else:
|
||||
return ""
|
||||
|
||||
def checkStatus(response, v):
|
||||
if (response.status_code != 200):
|
||||
if (v == True):
|
||||
print("\nAn error occurred.")
|
||||
print("HTTP Code: {}".format(str(response.status_code)))
|
||||
# error = json.loads(response.content) # TODO move this somewhere else
|
||||
# print("Response : " + error["Error"])
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
# Returns true if the token is still valid
|
||||
def checkTimestamp(save_time, cur_time):
|
||||
if cur_time - save_time < datetime.timedelta(0, 86100, 0):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
16
launcher.py
16
launcher.py
@ -1,13 +1,8 @@
|
||||
import os
|
||||
|
||||
from login import login
|
||||
from actions import wait
|
||||
from actions import clearScreen
|
||||
from actions import clearFolders
|
||||
from actions import download
|
||||
from actions import installReqs
|
||||
from actions import refreshToken
|
||||
from actions import update
|
||||
from authentication import login
|
||||
from main import download
|
||||
from main import wait
|
||||
from utils import clearFolders
|
||||
from utils import clearScreen
|
||||
from search import search
|
||||
|
||||
# TODO fix naming convention for all variables and functions
|
||||
@ -48,3 +43,4 @@ while True:
|
||||
wait()
|
||||
elif choice == "0":
|
||||
exit()
|
||||
|
||||
|
||||
@ -1,28 +1,19 @@
|
||||
import os
|
||||
import subprocess
|
||||
import shutil
|
||||
import json
|
||||
import datetime
|
||||
|
||||
import requests
|
||||
import datetime
|
||||
import dateutil
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
from utils import APIConnector
|
||||
|
||||
from checks import checkTimestamp
|
||||
from checks import checkStatus
|
||||
from checks import getToken
|
||||
|
||||
# TODO add counters for number of images downloaded and deleted
|
||||
|
||||
def wait():
|
||||
input("Press enter to continue.")
|
||||
|
||||
def clearScreen():
|
||||
IS_WINDOWS = os.name == "nt"
|
||||
if IS_WINDOWS:
|
||||
os.system("cls")
|
||||
else:
|
||||
os.system("clear")
|
||||
|
||||
def refreshToken():
|
||||
if os.path.exists("login.json"):
|
||||
try:
|
||||
@ -86,26 +77,6 @@ def download(series):
|
||||
out.write(json.dumps(json.loads(res.content)))
|
||||
|
||||
|
||||
def clearFolders(): # TODO implement this
|
||||
folders = ["banner", "fanart", "poster"]
|
||||
del_count = 0
|
||||
for folder in folders:
|
||||
if os.path.exists(folder):
|
||||
image_list = os.listdir(folder)
|
||||
if len(image_list) != 0:
|
||||
print("Clearing " + folder + "/")
|
||||
for x in image_list: # TODO check if folder is empty
|
||||
print("Deleting {}/{}".format(folder, x))
|
||||
del_path = os.path.join(folder + "\\" + x)
|
||||
os.remove(del_path)
|
||||
del_count += 1
|
||||
print()
|
||||
else:
|
||||
print("'{}' is already empty".format(folder))
|
||||
else:
|
||||
createFolder(folder)
|
||||
print("Deleted {} images.\n".format(del_count))
|
||||
|
||||
def createFolder(folder): # TODO remove this
|
||||
os.makedirs(folder)
|
||||
|
||||
11
search.py
11
search.py
@ -7,13 +7,10 @@ import re
|
||||
import requests
|
||||
import urllib.parse
|
||||
|
||||
from actions import clearFolders
|
||||
from actions import clearScreen
|
||||
from actions import downloadImages
|
||||
from actions import refreshToken
|
||||
from actions import searchImages
|
||||
from checks import checkTimestamp
|
||||
from checks import checkStatus
|
||||
from utils import clearFolders
|
||||
from utils import clearScreen
|
||||
from authentication import checkTimestamp
|
||||
from authentication import checkStatus
|
||||
|
||||
class Series:
|
||||
def __init__(self, folder_name, id, url):
|
||||
|
||||
54
utils.py
Normal file
54
utils.py
Normal file
@ -0,0 +1,54 @@
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
|
||||
|
||||
|
||||
class APIConnector:
|
||||
def __init__(self):
|
||||
with open("login.json", "r") as f:
|
||||
self.login = json.load(f)
|
||||
self.auth_headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Authorization": "Bearer " + self.login["TOKEN"]
|
||||
}
|
||||
|
||||
def reload_login(self):
|
||||
with open("login.json", "r") as f:
|
||||
self.login = json.load(f)
|
||||
self.auth_headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"Authorization": "Bearer " + self.login["TOKEN"]
|
||||
}
|
||||
|
||||
def send_http_req(self, api_path):
|
||||
return requests.get(api_path, headers=self.auth_headers)
|
||||
|
||||
def clearFolders(): # TODO implement this
|
||||
folders = ["banner", "fanart", "poster"]
|
||||
del_count = 0
|
||||
for folder in folders:
|
||||
if os.path.exists(folder):
|
||||
image_list = os.listdir(folder)
|
||||
if len(image_list) != 0:
|
||||
print("Clearing " + folder + "/")
|
||||
for x in image_list: # TODO check if folder is empty
|
||||
print("Deleting {}/{}".format(folder, x))
|
||||
del_path = os.path.join(folder + "\\" + x)
|
||||
os.remove(del_path)
|
||||
del_count += 1
|
||||
print()
|
||||
else:
|
||||
print("'{}' is already empty".format(folder))
|
||||
else:
|
||||
createFolder(folder)
|
||||
print("Deleted {} images.\n".format(del_count))
|
||||
|
||||
def clearScreen():
|
||||
IS_WINDOWS = os.name == "nt"
|
||||
if IS_WINDOWS:
|
||||
os.system("cls")
|
||||
else:
|
||||
os.system("clear")
|
||||
Loading…
Reference in New Issue
Block a user