459 lines
15 KiB
Python
459 lines
15 KiB
Python
import csv
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
FOLDER_MIME = "application/vnd.google-apps.folder"
|
|
|
|
|
|
@dataclass
|
|
class Item:
|
|
item_id: str
|
|
name: str
|
|
depth: str
|
|
mime_type: str
|
|
modified_time: str
|
|
size_bytes: str
|
|
owner_email: str
|
|
owner_name: str
|
|
path: str
|
|
sharing_user_email: str
|
|
last_modifying_user_email: str
|
|
|
|
|
|
def _norm(value: str) -> str:
|
|
return (value or "").strip()
|
|
|
|
|
|
def _to_bool(value: str) -> bool:
|
|
return _norm(value).lower() in {"true", "1", "yes"}
|
|
|
|
|
|
def file_category(mime_type: str, name: str) -> str:
|
|
mime = _norm(mime_type)
|
|
fname = _norm(name)
|
|
|
|
if mime == FOLDER_MIME:
|
|
return "FOLDER"
|
|
if mime.startswith("application/vnd.google-apps."):
|
|
return {
|
|
"application/vnd.google-apps.spreadsheet": "G_SHEET",
|
|
"application/vnd.google-apps.document": "G_DOC",
|
|
"application/vnd.google-apps.presentation": "G_SLIDES",
|
|
}.get(mime, "G_APP")
|
|
if mime.startswith("image/"):
|
|
return "IMAGE"
|
|
if mime.startswith("video/"):
|
|
return "VIDEO"
|
|
if mime.startswith("audio/"):
|
|
return "AUDIO"
|
|
if mime in {"application/x-subrip", "text/vtt"}:
|
|
return "SUBTITLE"
|
|
if mime in {"application/pdf"}:
|
|
return "PDF"
|
|
if mime in {"application/zip", "application/x-7z-compressed"}:
|
|
return "ARCHIVE"
|
|
if mime.startswith("text/"):
|
|
return "TEXT"
|
|
|
|
if "." in fname:
|
|
ext = fname.rsplit(".", 1)[-1].lower()
|
|
return ext.upper()[:12] if ext else "OTHER"
|
|
|
|
return "OTHER"
|
|
|
|
|
|
def _permission_principal(perm: dict) -> str:
|
|
ptype = perm.get("type", "")
|
|
email = perm.get("email", "")
|
|
domain = perm.get("domain", "")
|
|
|
|
if ptype == "user":
|
|
return email or "unknown-user"
|
|
if ptype == "group":
|
|
return f"group:{email}" if email else "group:unknown"
|
|
if ptype == "domain":
|
|
return f"domain:{domain}" if domain else "domain:unknown"
|
|
if ptype == "anyone":
|
|
return "anyone(public)"
|
|
return email or domain or ptype or "UNKNOWN"
|
|
|
|
|
|
def classify_access(permissions: List[dict]) -> str:
|
|
active = [p for p in permissions if not p.get("deleted", False)]
|
|
ptypes = {p["type"] for p in active if p["type"]}
|
|
owner_only = len(active) == 1 and any(p["role"] == "owner" for p in active)
|
|
|
|
if "anyone" in ptypes:
|
|
return "PUBLIC"
|
|
if "domain" in ptypes:
|
|
return "DOMAIN"
|
|
if owner_only:
|
|
return "PRIVATE"
|
|
if "user" in ptypes or "group" in ptypes:
|
|
return "SHARED_USERS"
|
|
return "PRIVATE"
|
|
|
|
|
|
def _is_external_email(email: str) -> bool:
|
|
e = _norm(email).lower()
|
|
return bool(e and "@" in e and not e.endswith("@wolkabout.com"))
|
|
|
|
|
|
def classify_risk(
|
|
*,
|
|
access_scope: str,
|
|
shared_with_count: int,
|
|
external_target_count: int,
|
|
writer_target_count: int,
|
|
writer_without_expiry_count: int,
|
|
owner_differs_from_shared_by: bool,
|
|
) -> tuple[str, str, int, str]:
|
|
score = 0
|
|
flags: List[str] = []
|
|
reasons: List[str] = []
|
|
|
|
if access_scope == "PUBLIC":
|
|
score += 75
|
|
flags.append("PUBLIC_LINK")
|
|
reasons.append("Javno dostupno (anyone link)")
|
|
elif access_scope == "DOMAIN":
|
|
score += 60
|
|
flags.append("DOMAIN_WIDE")
|
|
reasons.append("Dostupno celom domenu")
|
|
|
|
if shared_with_count >= 20:
|
|
score += 45
|
|
flags.append("MASS_SHARING_20")
|
|
reasons.append(f"Masovno deljenje ({shared_with_count} primalaca)")
|
|
elif shared_with_count >= 10:
|
|
score += 30
|
|
flags.append("MASS_SHARING_10")
|
|
reasons.append(f"Siroko deljenje ({shared_with_count} primalaca)")
|
|
elif shared_with_count >= 5:
|
|
score += 18
|
|
flags.append("MULTI_SHARING_5")
|
|
reasons.append(f"Vise primalaca ({shared_with_count})")
|
|
|
|
if external_target_count >= 5:
|
|
score += 35
|
|
flags.append("EXTERNAL_5")
|
|
reasons.append(f"Deljeno van firme ({external_target_count} eksternih)")
|
|
elif external_target_count >= 1:
|
|
score += 20
|
|
flags.append("EXTERNAL_1")
|
|
reasons.append("Deljeno van firme")
|
|
|
|
if writer_target_count >= 3:
|
|
score += 22
|
|
flags.append("WRITER_3")
|
|
reasons.append(f"Vise edit prava ({writer_target_count})")
|
|
elif writer_target_count >= 1:
|
|
score += 12
|
|
flags.append("WRITER_1")
|
|
reasons.append("Postoje edit prava")
|
|
|
|
if writer_without_expiry_count >= 1:
|
|
score += 10
|
|
flags.append("WRITER_NO_EXPIRY")
|
|
reasons.append("Edit prava bez isteka")
|
|
|
|
if owner_differs_from_shared_by:
|
|
score += 8
|
|
flags.append("RESHARED")
|
|
reasons.append("Deljenje nije pokrenuo originalni owner")
|
|
|
|
if access_scope == "PRIVATE" and shared_with_count == 0:
|
|
score = max(0, score - 8)
|
|
|
|
score = max(0, min(score, 100))
|
|
if score >= 70:
|
|
level = "HIGH"
|
|
elif score >= 35:
|
|
level = "MEDIUM"
|
|
else:
|
|
level = "LOW"
|
|
|
|
reason = "; ".join(reasons[:3]) if reasons else "Nema sirokog deljenja"
|
|
return level, reason, score, "|".join(flags)
|
|
|
|
|
|
def parse_gam_raw_csv(raw_csv_path: Path) -> List[dict]:
|
|
by_id: Dict[str, Item] = {}
|
|
permissions_by_id: Dict[str, List[dict]] = defaultdict(list)
|
|
|
|
with raw_csv_path.open(newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
item_id = _norm(row.get("id"))
|
|
if not item_id:
|
|
continue
|
|
|
|
if item_id not in by_id:
|
|
sharing_user = _norm(row.get("sharingUser.emailAddress"))
|
|
if not sharing_user:
|
|
sharing_user = _norm(row.get("sharingUser.0.emailAddress"))
|
|
|
|
last_mod = _norm(row.get("lastModifyingUser.emailAddress"))
|
|
if not last_mod:
|
|
last_mod = _norm(row.get("lastModifyingUser.0.emailAddress"))
|
|
|
|
by_id[item_id] = Item(
|
|
item_id=item_id,
|
|
name=_norm(row.get("name")),
|
|
depth=_norm(row.get("depth")),
|
|
mime_type=_norm(row.get("mimeType")),
|
|
modified_time=_norm(row.get("modifiedTime")),
|
|
size_bytes=_norm(row.get("size")),
|
|
owner_email=_norm(row.get("owners.0.emailAddress")),
|
|
owner_name=_norm(row.get("owners.0.displayName")),
|
|
path=_norm(row.get("path.0")),
|
|
sharing_user_email=sharing_user,
|
|
last_modifying_user_email=last_mod,
|
|
)
|
|
|
|
permissions_by_id[item_id].append(
|
|
{
|
|
"type": _norm(row.get("permission.type")),
|
|
"role": _norm(row.get("permission.role")),
|
|
"email": _norm(row.get("permission.emailAddress")),
|
|
"domain": _norm(row.get("permission.domain")),
|
|
"allow_discovery": _to_bool(row.get("permission.allowFileDiscovery", "")),
|
|
"id": _norm(row.get("permission.id")),
|
|
"deleted": _to_bool(row.get("permission.deleted", "")),
|
|
"expiration_time": _norm(row.get("permission.expirationTime")),
|
|
}
|
|
)
|
|
|
|
curated_rows: List[dict] = []
|
|
for item_id, base in by_id.items():
|
|
raw_permissions = permissions_by_id[item_id]
|
|
|
|
unique = {}
|
|
for p in raw_permissions:
|
|
key = (
|
|
p.get("type", ""),
|
|
p.get("role", ""),
|
|
p.get("email", ""),
|
|
p.get("domain", ""),
|
|
p.get("allow_discovery", False),
|
|
p.get("expiration_time", ""),
|
|
p.get("deleted", False),
|
|
)
|
|
unique[key] = p
|
|
permissions = list(unique.values())
|
|
|
|
access_scope = classify_access(permissions)
|
|
path = base.path
|
|
|
|
root_path = path.split("/", 1)[0] if path else "UNKNOWN"
|
|
parent_path = path.rsplit("/", 1)[0] if "/" in path else root_path
|
|
|
|
active_non_owner = [
|
|
p
|
|
for p in permissions
|
|
if not p["deleted"] and p["role"] != "owner" and (p["type"] or p["email"] or p["domain"])
|
|
]
|
|
|
|
shared_targets = sorted({_permission_principal(p) for p in active_non_owner})
|
|
external_target_count = len(
|
|
[
|
|
p
|
|
for p in active_non_owner
|
|
if _is_external_email(p.get("email", ""))
|
|
or (
|
|
p.get("type") == "domain"
|
|
and p.get("domain")
|
|
and _norm(p.get("domain", "")).lower() != "wolkabout.com"
|
|
)
|
|
]
|
|
)
|
|
writer_target_count = len(
|
|
[
|
|
p
|
|
for p in active_non_owner
|
|
if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"}
|
|
]
|
|
)
|
|
writer_without_expiry_count = len(
|
|
[
|
|
p
|
|
for p in active_non_owner
|
|
if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"}
|
|
and not p.get("expiration_time")
|
|
]
|
|
)
|
|
|
|
permission_entries = sorted(
|
|
{
|
|
f"{p['type']}:{_permission_principal(p)}:{p['role']}"
|
|
for p in permissions
|
|
if not p["deleted"] and p["type"]
|
|
}
|
|
)
|
|
|
|
direct_people = sorted(
|
|
{
|
|
p["email"]
|
|
for p in active_non_owner
|
|
if p["type"] in {"user", "group"} and p["email"]
|
|
}
|
|
)
|
|
direct_domains = sorted(
|
|
{
|
|
p["domain"]
|
|
for p in active_non_owner
|
|
if p["type"] == "domain" and p["domain"]
|
|
}
|
|
)
|
|
|
|
# Best effort for "who shared":
|
|
# 1) sharingUser (if GAM provides it), 2) last modifying user, 3) owner.
|
|
shared_by = (
|
|
base.sharing_user_email
|
|
or base.last_modifying_user_email
|
|
or base.owner_email
|
|
or "UNKNOWN"
|
|
)
|
|
shared_with_count = len(shared_targets)
|
|
risk_level, risk_reason, risk_score, risk_flags = classify_risk(
|
|
access_scope=access_scope,
|
|
shared_with_count=shared_with_count,
|
|
external_target_count=external_target_count,
|
|
writer_target_count=writer_target_count,
|
|
writer_without_expiry_count=writer_without_expiry_count,
|
|
owner_differs_from_shared_by=bool(
|
|
base.owner_email
|
|
and shared_by
|
|
and base.owner_email.lower() != shared_by.lower()
|
|
),
|
|
)
|
|
|
|
curated_rows.append(
|
|
{
|
|
"item_id": base.item_id,
|
|
"name": base.name,
|
|
"item_kind": "FOLDER" if base.mime_type == FOLDER_MIME else "FILE",
|
|
"path": path,
|
|
"parent_path": parent_path,
|
|
"root_path": root_path,
|
|
"depth": base.depth,
|
|
"owner_email": base.owner_email,
|
|
"owner_name": base.owner_name,
|
|
"original_owner_email": base.owner_email,
|
|
"shared_by_email": shared_by,
|
|
"last_modifying_user_email": base.last_modifying_user_email,
|
|
"access_scope": access_scope,
|
|
"access_targets": "|".join(shared_targets),
|
|
"direct_user_group_targets": "|".join(direct_people),
|
|
"direct_domain_targets": "|".join(direct_domains),
|
|
"permission_entries": "|".join(permission_entries),
|
|
"shared_with_count": shared_with_count,
|
|
"risk_level": risk_level,
|
|
"risk_reason": risk_reason,
|
|
"risk_score": risk_score,
|
|
"risk_flags": risk_flags,
|
|
"is_shared": str(access_scope != "PRIVATE"),
|
|
"mime_type": base.mime_type,
|
|
"file_category": file_category(base.mime_type, base.name),
|
|
"size_bytes": base.size_bytes,
|
|
"modified_time": base.modified_time,
|
|
"external_target_count": external_target_count,
|
|
"writer_target_count": writer_target_count,
|
|
"permission_count": str(len([p for p in permissions if not p['deleted']])),
|
|
}
|
|
)
|
|
|
|
curated_rows.sort(key=lambda x: (x["root_path"], x["path"], x["name"]))
|
|
return curated_rows
|
|
|
|
|
|
def summarize(curated_rows: List[dict]) -> List[dict]:
|
|
agg: Dict[Tuple[str, str, str, str], int] = defaultdict(int)
|
|
for row in curated_rows:
|
|
key = (
|
|
row["root_path"],
|
|
row["file_category"],
|
|
row["access_scope"],
|
|
row["owner_email"],
|
|
)
|
|
agg[key] += 1
|
|
|
|
output = []
|
|
for (root_path, file_category_, access_scope, owner_email), item_count in agg.items():
|
|
output.append(
|
|
{
|
|
"root_path": root_path,
|
|
"file_category": file_category_,
|
|
"access_scope": access_scope,
|
|
"owner_email": owner_email,
|
|
"item_count": item_count,
|
|
}
|
|
)
|
|
|
|
output.sort(key=lambda x: (-x["item_count"], x["root_path"], x["file_category"]))
|
|
return output
|
|
|
|
|
|
def sharing_matrix(curated_rows: List[dict]) -> List[dict]:
|
|
out: List[dict] = []
|
|
|
|
for row in curated_rows:
|
|
entries = [e for e in row.get("permission_entries", "").split("|") if e]
|
|
|
|
if not entries:
|
|
out.append(
|
|
{
|
|
"item_id": row["item_id"],
|
|
"name": row["name"],
|
|
"path": row["path"],
|
|
"original_owner_email": row["original_owner_email"],
|
|
"shared_by_email": row["shared_by_email"],
|
|
"shared_to": "owner-only",
|
|
"permission_type": "user",
|
|
"permission_role": "owner",
|
|
"access_scope": row["access_scope"],
|
|
"file_category": row["file_category"],
|
|
}
|
|
)
|
|
continue
|
|
|
|
for ent in entries:
|
|
# format: type:principal:role
|
|
parts = ent.split(":", 2)
|
|
if len(parts) != 3:
|
|
continue
|
|
ptype, principal, role = parts
|
|
|
|
out.append(
|
|
{
|
|
"item_id": row["item_id"],
|
|
"name": row["name"],
|
|
"path": row["path"],
|
|
"original_owner_email": row["original_owner_email"],
|
|
"shared_by_email": row["shared_by_email"],
|
|
"shared_to": principal,
|
|
"permission_type": ptype,
|
|
"permission_role": role,
|
|
"access_scope": row["access_scope"],
|
|
"file_category": row["file_category"],
|
|
}
|
|
)
|
|
|
|
out.sort(key=lambda x: (x["path"], x["shared_to"], x["permission_role"]))
|
|
return out
|
|
|
|
|
|
def top_stats(curated_rows: List[dict]) -> dict:
|
|
return {
|
|
"total_items": len(curated_rows),
|
|
"total_folders": sum(1 for x in curated_rows if x["item_kind"] == "FOLDER"),
|
|
"public_items": sum(1 for x in curated_rows if x["access_scope"] == "PUBLIC"),
|
|
"domain_items": sum(1 for x in curated_rows if x["access_scope"] == "DOMAIN"),
|
|
"shared_user_items": sum(
|
|
1 for x in curated_rows if x["access_scope"] == "SHARED_USERS"
|
|
),
|
|
"roots": Counter(x["root_path"] for x in curated_rows).most_common(8),
|
|
}
|