Browse Source

Signed-off-by: Alan Woodman <alan@awoodman.net>

master
Alan Woodman 1 year ago
parent
commit
aa0284cff5
  1. 10
      .gitignore
  2. 19
      __init__.py
  3. 156
      main.py
  4. 91
      models.py

10
.gitignore

@ -138,3 +138,13 @@ dmypy.json
# Cython debug symbols
cython_debug/
venv
__pycache__
Include
Lib
Scripts
pyvenv.cfg
pyodbc.pyi
*.xlsx

19
__init__.py

@ -0,0 +1,19 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config['SECRET_KEY'] = "6aewrg3IUGHWD21@$t4y,@#sw"
app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql://flask:FR0u9312rad$swib13125@192.168.20.53/hades"
#app.config['SQLALCHEMY_ECHO'] = True
#app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
return app

156
main.py

@ -0,0 +1,156 @@
from flask import Blueprint, request, jsonify
from sqlalchemy import and_
from typing import List, Dict, Any, Optional
from . import db
from .models import *
from http import HTTPStatus
main = Blueprint('main', __name__)
class APIError(Exception):
"""Custom exception for API errors"""
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(message)
@main.errorhandler(APIError)
def handle_api_error(error):
return jsonify({'error': error.message}), error.status_code
@main.route('/')
def index():
return """Don't curse the darkness — light a candle!".<br><small>&nbsp;&nbsp;&nbsp;- Hades</small>"""
@main.route('/service_qualify')
def service_qualify():
locid: str = request.args.get('locid', '')
if not locid:
raise APIError("Location ID is required", HTTPStatus.BAD_REQUEST)
try:
location = (
db.session.query(Locations, ParentLocations)
.join(Locations, Locations.ParentLocation_ID == ParentLocations.id)
.filter(Locations.LocationIdentifier == locid)
.first()
)
if not location:
return jsonify({
"locid": locid,
"status": False,
"message": "Location not found"
}), HTTPStatus.NOT_FOUND
plans = (
db.session.query(ParentLocations, Plans, PlanGroups, Plans2Groups)
.join(ParentLocations, ParentLocations.PlanGroup_ID == PlanGroups.id)
.join(Plans2Groups, Plans2Groups.PlanGroup_ID == PlanGroups.id)
.join(Plans, Plans.id == Plans2Groups.Plan_ID)
.filter(ParentLocations.id == location.ParentLocations.id)
.all()
)
plan_list = [plan.Plans.PlanIdentifier for plan in plans]
return jsonify({
"locid": locid,
"status": True,
"serviceclass": location.Locations.ServiceClass,
"newDevelopmentFeeApplicable": 30000,
"unitidentifier": location.Locations.UnitIdentifier,
"streetaddress": location.Locations.StreetAddress,
"suburb": location.Locations.Suburb,
"state": location.Locations.State,
"postcode": location.Locations.Postcode,
"country": location.Locations.Country,
"plans": plan_list,
}), HTTPStatus.OK
except Exception as e:
db.session.rollback()
raise APIError(f"Error processing request: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR)
@main.route('/address_search')
def address_search():
unit: str = request.args.get('unit', '')
address: str = request.args.get('address', '')
if not address:
return jsonify({
"address": address,
"status": True,
"count": 0,
"results": [],
}), HTTPStatus.OK
try:
addresses = (
db.session.query(Locations)
.filter(and_(
Locations.StreetAddress.ilike(f'%{address}%'),
Locations.UnitIdentifier.ilike(f'%{unit}%')
))
.limit(5)
.all()
)
results = [
{
'locid': addr.LocationIdentifier,
'address': ','.join((
addr.UnitIdentifier,
addr.StreetAddress,
addr.Suburb,
addr.State,
addr.Postcode,
addr.Country
)),
'status': 'active'
}
for addr in addresses
]
return jsonify({
"address": address,
"status": True,
"count": len(results),
"results": results,
}), HTTPStatus.OK
except Exception as e:
db.session.rollback()
raise APIError(f"Error searching addresses: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR)
@main.route('/plan_search')
def plan_search():
plan_req: List[str] = request.args.get('plans', '').split(',')
if not plan_req or plan_req == ['']:
return jsonify({"plans": []}), HTTPStatus.OK
try:
plans = (
db.session.query(Plans)
.filter(Plans.PlanIdentifier.in_(plan_req))
.all()
)
plan_list = [
{
"plan": plan.PlanIdentifier,
"planName": plan.Name,
"cost": plan.Cost,
"description": plan.Description,
"uploadSpeed": plan.UploadSpeed,
"downloadSpeed": plan.DownloadSpeed,
"wholesale": False
}
for plan in plans
]
return jsonify({"plans": plan_list}), HTTPStatus.OK
except Exception as e:
db.session.rollback()
raise APIError(f"Error searching plans: {str(e)}", HTTPStatus.INTERNAL_SERVER_ERROR)

91
models.py

@ -0,0 +1,91 @@
from datetime import datetime, timezone
from flask import abort
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import synonym, relationship
from . import db
class Locations(db.Model):
__tablename__ = 'Locations'
id = db.Column(db.Integer, primary_key=True)
LocationIdentifier = db.Column(db.String(255))
ServiceClass = db.Column(db.Integer)
ParentLocation_ID = db.Column(db.Integer, db.ForeignKey('ParentLocations.id'), nullable=False)
UnitIdentifier = db.Column(db.String(255))
StreetAddress = db.Column(db.String(255))
Suburb = db.Column(db.String(255))
State = db.Column(db.String(255))
Postcode = db.Column(db.String(255))
Country = db.Column(db.String(255))
Created = db.Column(db.DateTime, nullable=False, default=datetime.now(timezone.utc))
intOrder = db.Column(db.Integer)
ONT = db.relationship('ONTs', cascade="all,delete", back_populates='Location')
class ONTs(db.Model):
__tablename__ = 'ONTs'
id = db.Column(db.Integer, primary_key=True)
ONTIdentifier = db.Column(db.String(255))
SerialNumber = db.Column(db.String(255))
MACAddress = db.Column(db.String(255))
OLTInterfacePartition_ID = db.Column(db.Integer, db.ForeignKey('OLTInterfacePartitions.id'), nullable=True)
ONU_ID = db.Column(db.Integer)
ONTModel_ID = db.Column(db.Integer, db.ForeignKey('ONTModels.id'), nullable=False)
Location_ID = db.Column(db.Integer, db.ForeignKey('Locations.id'), nullable=False)
Cached_RX = db.Column(db.Integer)
Created = db.Column(db.DateTime, nullable=False, default=datetime.now(timezone.utc))
Commissioned_RX = db.Column(db.Integer)
Commissioned_Date = db.Column(db.DateTime)
Fibre_Distance = db.Column(db.Integer)
Location = db.relationship('Locations', foreign_keys=Location_ID,cascade="all,delete", back_populates='ONT')
class ParentLocations(db.Model):
__tablename__ = 'ParentLocations'
id = db.Column(db.Integer, primary_key=True)
LegalName = db.Column(db.String(255))
Alias = db.Column(db.String(255))
StreetAddress = db.Column(db.String(255))
Suburb = db.Column(db.String(255))
State = db.Column(db.String(255))
Postcode = db.Column(db.String(255))
MaxONT_dB = db.Column(db.Integer)
PlanGroup_ID = db.Column(db.Integer)
Created = db.Column(db.DateTime, nullable=False, default=datetime.now(timezone.utc))
#Locations = db.relationship('Locations', cascade="all,delete", backref='Locations', lazy='dynamic')
Locations = db.relationship('Locations', primaryjoin='ParentLocations.id == Locations.ParentLocation_ID', backref='Location_ID_link')
class Plans(db.Model):
__tablename__ = 'Plans'
id = db.Column(db.Integer, primary_key=True)
PlanIdentifier = db.Column(db.String(255))
Name = db.Column(db.String(255))
Cost = db.Column(db.Integer)
UploadSpeed = db.Column(db.Integer)
DownloadSpeed = db.Column(db.Integer)
Description = db.Column(db.Text)
class PlanGroups(db.Model):
__tablename__ = 'PlanGroups'
id = db.Column(db.Integer, primary_key=True)
Name = db.Column(db.String(255))
class Plans2Groups(db.Model):
__tablename__ = 'Plans2Groups'
id = db.Column(db.Integer, primary_key=True)
PlanGroup_ID = db.Column(db.Integer)
Plan_ID = db.Column(db.Integer)
class WholesaleCustomers(db.Model):
__tablename__ = 'WholesaleCustomers'
id = db.Column(db.Integer, primary_key=True)
Name = db.Column(db.String(255))
Outer_VLAN = db.Column(db.Integer)
ServicePrefix = db.Column(db.String(5))
Created = db.Column(db.DateTime, nullable=False, default=datetime.now(timezone.utc))
Loading…
Cancel
Save