Hp137's picture
feat:added banner notification
fea9527
from src.core.config import settings
import httpx
from google.oauth2 import service_account
import google.auth.transport.requests
# Your Firebase project ID (from project settings)
FCM_PROJECT_ID = settings.FIREBASE_PROJECT_ID # <-- change this
service_account_info = {
"type": settings.FIREBASE_TYPE,
"project_id": settings.FIREBASE_PROJECT_ID,
"private_key_id": settings.FIREBASE_PRIVATE_KEY_ID,
"private_key": settings.FIREBASE_PRIVATE_KEY.replace("\\n", "\n"),
"client_email": settings.FIREBASE_CLIENT_EMAIL,
"client_id": settings.FIREBASE_CLIENT_ID,
"auth_uri": settings.FIREBASE_AUTH_URI,
"token_uri": settings.FIREBASE_TOKEN_URI,
"auth_provider_x509_cert_url": settings.FIREBASE_AUTH_PROVIDER_X509_CERT_URL,
"client_x509_cert_url": settings.FIREBASE_CLIENT_X509_CERT_URL,
"universe_domain": settings.FIREBASE_UNIVERSE_DOMAIN,
}
def get_access_token():
"""Generate OAuth2 access token for FCM HTTP v1."""
scopes = ["https://www.googleapis.com/auth/firebase.messaging"]
credentials = service_account.Credentials.from_service_account_info(
service_account_info, scopes=scopes
)
request = google.auth.transport.requests.Request()
credentials.refresh(request)
return credentials.token
async def send_fcm(
tokens: list[str],
title: str,
body: str,
data: dict | None = None,
priority: str = "high",
):
"""Send push notifications using Firebase HTTP v1."""
if not tokens:
return
access_token = get_access_token()
url = f"https://fcm.googleapis.com/v1/projects/{FCM_PROJECT_ID}/messages:send"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json; UTF-8",
}
# FCM v1 sends only one token per message
for token in tokens:
message = {
"message": {
"token": token,
"data": {"title": title, "body": body, **(data or {})},
"android": {
"priority": priority,
},
}
}
async with httpx.AsyncClient() as client:
res = await client.post(url, json=message, headers=headers)
print("FCM Response:", res.text)