import time import json import requests # type: ignore import hmac import hashlib from urllib.parse import urlencode SPLYNX_URL = 'https://billing.interphone.com.au' #SPLYNX_KEY = 'c189c78b155ee8e4d389bbcb34bebc05' #SPLYNX_SECRET = '1454679ddf5c97ea347766709d3ca3bd' SPLYNX_KEY = 'b4cd90cbea15e7692c940484a9637fc4' SPLYNX_SECRET = '297ce5c6b7cd5aaf93d8c725fcb49f8f' class Splynx(): def __init__(self, url, key, secret): self.url = url self.key = key self.secret = secret self.token = None self.refresh = None self.refreshtime = None self.refreshexpire = None def _authenticate(self): nonce = str(round(time.time() * 1000)) sig = hmac.new(bytes(self.secret, 'UTF-8'),msg=bytes(nonce+self.key, 'UTF-8'), digestmod = hashlib.sha256).hexdigest().upper() data = { 'auth_type': 'api_key', 'key': self.key, 'nonce': nonce, 'signature': sig} headers = {'Content-Type': 'application/json'} ret = requests.post(url=self.url+'/api/2.0/admin/auth/tokens', data=json.dumps(data), headers=headers) jsonret = json.loads(ret.content) self.token = jsonret['access_token'] self.refresh = jsonret['refresh_token'] self.refreshtime = jsonret['access_token_expiration'] self.refreshexpire = jsonret['refresh_token_expiration'] def _refresh(self): headers = {'Content-Type': 'application/json', 'Authorization': 'Splynx-EA (access_token={token})'.format(token=self.token)} ret = requests.get(url=self.url+'/api/2.0/admin/auth/tokens/{refresh}'.format(refresh=self.refresh),headers=headers) jsonret = json.loads(ret.content) self.token = jsonret['access_token'] self.refresh = jsonret['refresh_token'] self.refreshtime = jsonret['access_token_expiration'] self.refreshexpire = jsonret['refresh_token_expiration'] def getToken(self): if self.token is None: self._authenticate() if self.token is not None and (self.refreshexpire <= int(time.time())): self._authenticate() if self.token is not None and (self.refreshtime <= int(time.time())): self._refresh() return self.token def get(self, url): headers = {'Content-Type': 'application/json', 'Authorization': 'Splynx-EA (access_token={token})'.format(token=self.getToken())} ret = requests.get(url=self.url+url, headers=headers) return json.loads(ret.content) def put(self, url, params): headers = {'Content-Type': 'application/json', 'Authorization': 'Splynx-EA (access_token={token})'.format(token=self.getToken())} ret = requests.put(url=self.url+url, headers=headers, data=json.dumps(params)) if ret.status_code == 202: return True return False def post(self, url, params): headers = {'Content-Type': 'application/json', 'Authorization': 'Splynx-EA (access_token={token})'.format(token=self.getToken())} ret = requests.post(url=self.url+url, headers=headers, data=json.dumps(params)) if ret.status_code == 201: return json.loads(ret.content) return False def delete(self, url): headers = {'Content-Type': 'application/json', 'Authorization': 'Splynx-EA (access_token={token})'.format(token=self.getToken())} ret = requests.delete(url=self.url+url, headers=headers) if ret.status_code == 204: return True return False # Helper function to build query parameters for Splynx API def build_splynx_query_params(self, params): """ Convert nested dictionary to flattened query parameters that Splynx expects Args: params (dict): Dictionary containing search parameters Returns: str: Query string for the API URL """ def flatten_params(data, parent_key=''): items = [] for k, v in data.items(): new_key = f"{parent_key}[{k}]" if parent_key else k if isinstance(v, dict): items.extend(flatten_params(v, new_key).items()) elif isinstance(v, list): for i, item in enumerate(v): if isinstance(item, list): # Handle nested lists like ['male', 'female'] within ['IN', ['male', 'female']] for j, sub_item in enumerate(item): items.append((f"{new_key}[{i}][{j}]", sub_item)) else: items.append((f"{new_key}[{i}]", item)) else: items.append((new_key, v)) return dict(items) flattened_params = flatten_params(params) return urlencode(flattened_params, doseq=True) def ServiceStatus(self, service_login): try: s1 = self.get(url=f"/api/2.0/admin/customers/customer/0/internet-services?main_attributes[login]={service_login}") if not s1: return { 'status': 'no service found', 'customer_name': 'none', 'customer_id': 'none' } service_status = s1[-1].get('status') if service_status == "active": s2 = self.get(url="/api/2.0/admin/customers/customers-online") if s2: online_services = [d for d in s2 if str(service_login) in d.values()] #print(f"online_services: {json.dumps(online_services,indent=2)}") if online_services: detail = online_services[0] cust = self.Customer(detail['customer_id']) detail['status'] = "Online" detail['customer_name'] = cust['name'] return detail else: cust = self.Customer(s1[-1].get('customer_id')) return { 'status': 'Offline', 'customer_name': cust['name'], 'customer_id': cust['id'] } else: # No online customers data available return { 'status': 'Offline' } else: # Service exists but is not active (could be suspended, terminated, etc.) cust = self.Customer(s1[-1].get('customer_id')) return { 'status': service_status.capitalize(), 'customer_name': cust['name'], 'customer_id': cust['id'] } except Exception as e: print(f"Error checking service status for {service_login}: {str(e)}") return { 'status': 'no service found', 'customer_name': 'none', 'customer_id': 'none' } def Customer(self, customer_id): try: result = self.get(url=f"/api/2.0/admin/customers/customer/{customer_id}/") #print(json.dumps(result,indent=2)) return result except: return 'unknown' def GetInternetTariffs(self, tariff_id=None): try: if tariff_id: tariffs = self.get(url=f"/api/2.0/admin/tariffs/internet/{tariff_id}") else: tariffs = self.get(url=f"/api/2.0/admin/tariffs/internet") return tariffs except Exception as e: print(f"Error getting Internet Tariffs: {str(e)}") return { 'status': 'no Internet Tariff found'} def create_ticket( self, customer_id: int, subject: str, priority: str = "medium", group_id: int = 2, # Default to admin group status_id: int = 1, type_id: int = 1 ) -> dict: """ Create a support ticket in Splynx. Args: customer_id (int): Splynx customer ID subject (str): Ticket subject line message (str): Ticket message content priority (str): Ticket priority ('low', 'medium', 'high', 'urgent') group_id (int): Admin group ID for assignment Returns: dict: API response with ticket information or error """ try: ticket_data = { 'customer_id': customer_id, 'subject': subject, 'priority': priority, 'group_id': group_id, 'status_id': status_id, 'type_id': type_id } result = self.post(url="/api/2.0/admin/support/tickets", params=ticket_data) if result: print(f"✅ Splynx ticket created: #{result.get('id')} for customer {customer_id}") return { 'success': True, 'ticket_id': result.get('id'), 'ticket_data': result } else: print(f"❌ Failed to create Splynx ticket for customer {customer_id}") return { 'success': False, 'error': 'API request failed' } except Exception as e: print(f"Error creating Splynx ticket: {e}") return { 'success': False, 'error': str(e) } def add_ticket_message(self, ticket_id: int, message: str, is_admin: bool = False, hide_for_customer: bool = False, message_type: str = 'message') -> dict: """ Add a message to an existing support ticket. Args: ticket_id (int): Splynx ticket ID message (str): Message content to add is_admin (bool): Whether message is from admin (True) or customer (False) Returns: dict: API response with message information or error """ try: #'message_type': "note",'mail_to': 'alan@awoodman.net,woody@awoodman.net' message_data = { 'ticket_id': ticket_id, 'message': message, 'admin_id': 1 if is_admin else None, # Set to admin user ID if admin message 'customer_id': None if is_admin else 0, # Set appropriately for customer messages 'hide_for_customer': hide_for_customer, 'author_type': "api", 'message_type': message_type, } result = self.post(url="/api/2.0/admin/support/ticket-messages", params=message_data) if result: return { 'success': True, 'message_id': result.get('id'), 'message_data': result } else: return { 'success': False, 'error': 'API request failed' } except Exception as e: print(f"Error adding ticket message: {e}") return { 'success': False, 'error': str(e) }