instance2 / Instance.py
ChandimaPrabath's picture
init
dd2f10e
import os
import requests
import json
import time
from threading import Thread, Event
from requests.exceptions import RequestException
from tqdm import tqdm
import logging
CACHE_DIR = os.getenv("CACHE_DIR")
download_progress = {}
class Instance:
def __init__(self, id, url, cache_dir, token, repo, load_balancer_api, max_retries=20, initial_delay=1):
self.version = "0.0.0.1 Alpha"
self.id = id
self.url = url
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.FILM_STORE = {}
self.TV_STORE = {}
self.download_threads = {}
self.file_structure = None
self.load_balancer_api = load_balancer_api
self.max_retries = max_retries
self.initial_delay = initial_delay
self.last_report_time = time.time()
self.re_register_event = Event()
# Ensure CACHE_DIR exists
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
self.register_to_load_balancer()
self.reload_file_structure()
registration_thread = Thread(target=self.monitor_registration)
registration_thread.daemon = True
registration_thread.start()
# Start the thread to re-index every 2 minutes
indexer_thread = Thread(target=self.get_file_structure_periodically)
indexer_thread.daemon = True
indexer_thread.start()
def reload_file_structure(self):
"""Runs the indexer and loads the file structure from INDEX_FILE."""
self.file_structure = self.load_balancer_api.get_file_structure()
logging.info("File structure reloaded successfully.")
def get_file_structure_periodically(self):
"""Periodically reruns the indexer and reloads the file structure."""
while True:
time.sleep(300) # Wait for 5 minutes
logging.info("Re-running indexer and reloading file structure.")
self.reload_file_structure()
def compile_report(self):
self.last_report_time = time.time() # Update the last report time
cache_size = self.get_cache_size()
report = {
"instance_id": self.id,
"instance_url": self.url,
"film_store": self.FILM_STORE,
"tv_store": self.TV_STORE,
"cache_size": cache_size
}
return report
def register_to_load_balancer(self):
result = self.load_balancer_api.register_instance(self.id, self.url)
if result is not None:
logging.info(f'Registered instance {self.id} to load balancer.')
else:
logging.error(f'Failed to register instance {self.id} to load balancer.')
def monitor_registration(self):
while True:
if time.time() - self.last_report_time > 60: # Check if 1 minute has passed
logging.info('1 minute passed since last report. Re-registering...')
self.register_to_load_balancer()
self.last_report_time = time.time() # Reset the last report time
time.sleep(30) # Check every 30 seconds
def get_cache_size(self):
total_size = 0
for dirpath, dirnames, filenames in os.walk(CACHE_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"}
@staticmethod
def read_json(file_path):
if os.path.exists(file_path):
with open(file_path, 'r') as json_file:
return json.load(json_file)
return {}
def download_film(self, file_url, token, cache_path, film_id, title, chunk_size=100 * 1024 * 1024):
"""
Downloads a file from the specified URL and saves it to the cache path.
Tracks the download progress.
Args:
file_url (str): The URL of the file to download.
token (str): The authorization token for the request.
cache_path (str): The path to save the downloaded file.
film_id (str): Unique identifier for the film download.
title (str): The title of the film.
chunk_size (int): Size of each chunk to download.
"""
print(f"Downloading file from URL: {file_url} to {cache_path}")
headers = {'Authorization': f'Bearer {token}'}
try:
response = requests.get(file_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
download_progress[film_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()}
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
pbar.update(len(data))
download_progress[film_id]["downloaded"] += len(data)
print(f'File cached to {cache_path} successfully.')
self.FILM_STORE[title] = cache_path
download_progress[film_id]["status"] = "Completed"
except RequestException as e:
print(f"Error downloading file: {e}")
download_progress[film_id]["status"] = "Failed"
except IOError as e:
print(f"Error writing file {cache_path}: {e}")
download_progress[film_id]["status"] = "Failed"
finally:
if download_progress[film_id]["status"] != "Downloading":
download_progress[film_id]["end_time"] = time.time()
@staticmethod
def get_download_progress(id):
"""
Gets the download progress for a specific film.
Args:
film_id (str): The unique identifier for the film download.
Returns:
dict: A dictionary containing the total size, downloaded size, progress percentage, status, and ETA.
"""
if id in download_progress:
total = download_progress[id]["total"]
downloaded = download_progress[id]["downloaded"]
status = download_progress[id].get("status", "In Progress")
progress = (downloaded / total) * 100 if total > 0 else 0
eta = None
if status == "Downloading" and downloaded > 0:
elapsed_time = time.time() - download_progress[id]["start_time"]
estimated_total_time = elapsed_time * (total / downloaded)
eta = estimated_total_time - elapsed_time
elif status == "Completed":
eta = 0
return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta}
return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None}
def download_episode(self, file_url, token, cache_path, episode_id, title, chunk_size=100 * 1024 * 1024):
"""
Downloads a file from the specified URL and saves it to the cache path.
Tracks the download progress.
Args:
file_url (str): The URL of the file to download.
token (str): The authorization token for the request.
cache_path (str): The path to save the downloaded file.
episode_id (str): Unique identifier for the film download.
title (str): The title of the film.
chunk_size (int): Size of each chunk to download.
"""
print(f"Downloading file from URL: {file_url} to {cache_path}")
headers = {'Authorization': f'Bearer {token}'}
try:
response = requests.get(file_url, headers=headers, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
download_progress[episode_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()}
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
pbar.update(len(data))
download_progress[episode_id]["downloaded"] += len(data)
print(f'File cached to {cache_path} successfully.')
self.update_tv_store(title, cache_path)
download_progress[episode_id]["status"] = "Completed"
except RequestException as e:
print(f"Error downloading file: {e}")
download_progress[episode_id]["status"] = "Failed"
except IOError as e:
print(f"Error writing file {cache_path}: {e}")
download_progress[episode_id]["status"] = "Failed"
finally:
if download_progress[episode_id]["status"] != "Downloading":
download_progress[episode_id]["end_time"] = time.time()
def update_tv_store(self, title, cache_path):
"""
Updates the TV store JSON with the new file, organizing by title, season, and episode.
Args:
title (str): The title of the TV show.
cache_path (str): The local path where the file is saved.
"""
# Extract season and episode information from the cache_path
season_part = os.path.basename(os.path.dirname(cache_path)) # Extracts 'Season 1'
episode_part = os.path.basename(cache_path) # Extracts 'Grand Blue Dreaming - S01E01 - Deep Blue HDTV-720p.mp4'
# Create the structure if not already present
if title not in self.TV_STORE:
self.TV_STORE[title] = {}
if season_part not in self.TV_STORE[title]:
self.TV_STORE[title][season_part] = {}
# Assuming episode_part is unique for each episode within a season
self.TV_STORE[title][season_part][episode_part] = cache_path
print(f'TV store updated with {title}, {season_part}, {episode_part}.')
def load_json(self, file_path):
"""Load JSON data from a file."""
with open(file_path, 'r') as file:
return json.load(file)
def find_movie_path(self, title):
"""Find the path of the movie in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'films':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory':
for item in sub_directory['contents']:
if item['type'] == 'file' and title.lower() in item['path'].lower():
return item['path']
return None
def find_tv_path(self, title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_tv_structure(self, title):
"""Find the path of the TV show in the JSON data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory' and directory['path'] == 'tv':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower():
return sub_directory
return None
def get_film_id(self, title):
"""Generate a film ID based on the title."""
return title.replace(" ", "_").lower()
def bytes_to_human_readable(self, num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f} {unit}{suffix}"
num /= 1024.0
return f"{num:.1f} Y{suffix}"
def encode_episodeid(self, title, season, episode):
return f"{title}_{season}_{episode}"
def register_to_load_balancer(self):
retries = 0
delay = self.initial_delay
max_delay = 120
while True:
try:
result = self.load_balancer_api.register_instance(self.id, self.url)
if result:
logging.info(f'Successfully registered instance {self.id} to load balancer.')
return result
except Exception as e:
logging.error(f'Error during registration: {e}')
retries += 1
logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...')
time.sleep(delay)
delay = min(delay * 2, max_delay) # Exponential backoff with maximum delay