You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
3.8 KiB
112 lines
3.8 KiB
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, List
|
|
|
|
from strategies import (
|
|
ScraperStrategy,
|
|
NewsScraperStrategy,
|
|
BooksScraperStrategy,
|
|
TechNewsScraperStrategy
|
|
)
|
|
from models import ScrapedData
|
|
from exceptions import StrategyException, StorageException, ValidationException
|
|
|
|
|
|
class ScraperController:
|
|
def __init__(self, output_dir: str = "data"):
|
|
self.output_dir = output_dir
|
|
self.strategies: Dict[str, ScraperStrategy] = {}
|
|
self._register_default_strategies()
|
|
|
|
def _register_default_strategies(self):
|
|
self.register_strategy(NewsScraperStrategy())
|
|
self.register_strategy(BooksScraperStrategy())
|
|
self.register_strategy(TechNewsScraperStrategy())
|
|
|
|
def register_strategy(self, strategy: ScraperStrategy):
|
|
self.strategies[strategy.name] = strategy
|
|
|
|
def get_strategy(self, name: str) -> ScraperStrategy:
|
|
if name not in self.strategies:
|
|
available = ', '.join(self.strategies.keys())
|
|
raise StrategyException(
|
|
f"Strategy '{name}' not found. Available: {available}",
|
|
strategy_name=name
|
|
)
|
|
return self.strategies[name]
|
|
|
|
def list_strategies(self) -> List[Dict[str, str]]:
|
|
return [
|
|
{"name": s.name, "source": s.source}
|
|
for s in self.strategies.values()
|
|
]
|
|
|
|
def execute_scrape(self, strategy_name: str) -> ScrapedData:
|
|
strategy = self.get_strategy(strategy_name)
|
|
return strategy.scrape()
|
|
|
|
def save_data(self, data: ScrapedData, strategy_name: str) -> str:
|
|
try:
|
|
folder_path = os.path.join(self.output_dir, strategy_name)
|
|
os.makedirs(folder_path, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"scraped_data_{timestamp}.json"
|
|
file_path = os.path.join(folder_path, filename)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data.to_dict(), f, ensure_ascii=False, indent=2)
|
|
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
raise StorageException(
|
|
f"Failed to save data to {folder_path}",
|
|
file_path=folder_path,
|
|
original_exception=e
|
|
)
|
|
|
|
def delete_data(self, file_path: str) -> bool:
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
raise StorageException(
|
|
f"Failed to delete file {file_path}",
|
|
file_path=file_path,
|
|
original_exception=e
|
|
)
|
|
|
|
def load_data(self, strategy_name: str, filename: str = None) -> ScrapedData:
|
|
try:
|
|
folder_path = os.path.join(self.output_dir, strategy_name)
|
|
if not os.path.exists(folder_path):
|
|
raise StorageException(
|
|
f"No data found for strategy '{strategy_name}'",
|
|
file_path=folder_path
|
|
)
|
|
|
|
if filename:
|
|
file_path = os.path.join(folder_path, filename)
|
|
else:
|
|
files = sorted([f for f in os.listdir(folder_path) if f.endswith('.json')])
|
|
if not files:
|
|
raise StorageException(f"No data files found in {folder_path}")
|
|
file_path = os.path.join(folder_path, files[-1])
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data_dict = json.load(f)
|
|
|
|
return data_dict
|
|
|
|
except Exception as e:
|
|
if isinstance(e, StorageException):
|
|
raise e
|
|
raise StorageException(
|
|
f"Failed to load data",
|
|
file_path=file_path if 'file_path' in locals() else None,
|
|
original_exception=e
|
|
)
|
|
|