from flask import Blueprint, request, jsonify from sqlalchemy import and_ from typing import List, Dict, Any, Optional from . import db from .models import * from http import HTTPStatus main = Blueprint('main', __name__) class APIError(Exception): """Custom exception for API errors""" def __init__(self, message: str, status_code: int = 400): self.message = message self.status_code = status_code super().__init__(message) @main.errorhandler(APIError) def handle_api_error(error): return jsonify({'error': error.message}), error.status_code @main.route('/') def index(): return """Don't curse the darkness — light a candle!".
   - Hades""" @main.route('/service_qualify') def service_qualify(): locid: str = request.args.get('locid', '') if not locid: raise APIError("Location ID is required", HTTPStatus.BAD_REQUEST) try: location = ( db.session.query(Locations, ParentLocations) .join(Locations, Locations.ParentLocation_ID == ParentLocations.id) .filter(Locations.LocationIdentifier == locid) .first() ) if not location: return jsonify({ "locid": locid, "status": False, "message": "Location not found" }), HTTPStatus.NOT_FOUND plans = ( db.session.query(ParentLocations, Plans, PlanGroups, Plans2Groups) .join(ParentLocations, ParentLocations.PlanGroup_ID == PlanGroups.id) .join(Plans2Groups, Plans2Groups.PlanGroup_ID == PlanGroups.id) .join(Plans, Plans.id == Plans2Groups.Plan_ID) .filter(ParentLocations.id == location.ParentLocations.id) .all() ) plan_list = [plan.Plans.PlanIdentifier for plan in plans] return jsonify({ "locid": locid, "status": True, "serviceclass": location.Locations.ServiceClass, "newDevelopmentFeeApplicable": 30000, "unitidentifier": location.Locations.UnitIdentifier, "streetaddress": location.Locations.StreetAddress, "suburb": location.Locations.Suburb, "state": location.Locations.State, "postcode": int(location.Locations.Postcode), "country": location.Locations.Country, "plans": plan_list, }), HTTPStatus.OK except Exception as e: db.session.rollback() raise APIError(f"Error processing request: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR) @main.route('/address_search') def address_search(): unit: str = request.args.get('unit', '') address: str = request.args.get('address', '') if not address: return jsonify({ "address": address, "status": True, "count": 0, "results": [], }), HTTPStatus.OK try: addresses = ( db.session.query(Locations) .filter(and_( Locations.StreetAddress.ilike(f'%{address}%'), Locations.UnitIdentifier.ilike(f'%{unit}%') )) .limit(5) .all() ) results = [ { 'locid': addr.LocationIdentifier, 'address': ', '.join(( addr.UnitIdentifier, addr.StreetAddress, addr.Suburb, addr.State, addr.Postcode, addr.Country )), 'status': 'active' } for addr in addresses ] return jsonify({ "address": address, "status": True, "count": len(results), "results": results, }), HTTPStatus.OK except Exception as e: db.session.rollback() raise APIError(f"Error searching addresses: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR) @main.route('/plan_search') def plan_search(): plan_req: Dict[str] = request.args.to_dict(flat=False) if not plan_req or "plans" not in plan_req: return jsonify({"plans": []}), HTTPStatus.OK else: try: plans = ( db.session.query(Plans) .filter(Plans.PlanIdentifier.in_(plan_req['plans'])) .all() ) plan_list = [ { "plan": plan.PlanIdentifier, "planName": plan.Name, "cost": plan.Cost, "description": plan.Description, "uploadSpeed": plan.UploadSpeed, "downloadSpeed": plan.DownloadSpeed, "wholesale": False } for plan in plans ] return jsonify({"plans": plan_list}), HTTPStatus.OK except Exception as e: db.session.rollback() raise APIError(f"Error searching plans: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR)