Source

Target

Showing with 197 additions and 1 deletion
+197 -1
......@@ -8,6 +8,5 @@ from responses.responses import OkResponse
async def get_me(request: Request):
""" Returns user itself serialized """
user: User = request.state.user
print(user)
user_model = UserPD.from_orm(user)
return OkResponse(user_model)
import fitz
from parsers.text_parser import find_ioc, CollectedData
def process_pdf(pdf_path: str) -> CollectedData:
"""
Processes the text from the PDF file
Args:
pdf_path (str): Path to the PDF file
"""
doc = fitz.open(pdf_path)
collected_data = CollectedData()
for page in doc.pages():
current_page_collected_data = find_ioc(page.get_text().lower())
collected_data.hashes.update(current_page_collected_data.hashes)
collected_data.ips.update(current_page_collected_data.ips)
collected_data.urls.update(current_page_collected_data.urls)
return collected_data
from re import findall
from typing import Set, Iterable
from pydantic import BaseModel, IPvAnyAddress, AnyUrl
class CollectedData(BaseModel):
""" Represents the data collected from text """
hashes: Set[str] = set()
""" Set of hashes found in the text """
ips: Set[IPvAnyAddress] = set()
""" Set of IPs found in the text """
urls: Set[AnyUrl] = set()
""" Set of URLs found in the text """
def filter_already_found_hashes(new_hashes: list, already_found_hashes: list) -> list:
""" Filters out hashes that are already found in the already_found_hashes list
Args:
new_hashes (list): List of hashes to filter
already_found_hashes (list): List of hashes to filter out
"""
for already_found_hash in already_found_hashes:
filtered_hashes = []
for md5_hash in new_hashes:
if md5_hash not in already_found_hash:
filtered_hashes.append(md5_hash)
new_hashes = filtered_hashes.copy()
return new_hashes
def filter_local_ips(ips: Iterable[str]) -> list:
"""
Filter out local IPs from the list of IPs
Args:
ips (list): List of IPs to filter
"""
filtered_ips = []
for ip in ips:
octets = list(map(int, ip.split(".")))
if octets[0] == 10 or (octets[0] == 172 and 16 <= octets[1] <= 31) or (octets[0] == 192 and octets[1] == 168) \
or octets[0] == 127:
continue
filtered_ips.append(ip)
return filtered_ips
def filter_invalid_ips(ips: Iterable[str]) -> list:
"""
Filter out invalid IPs from the list of IPs
Args:
ips (list): List of IPs to filter
"""
filtered_ips = []
for ip in ips:
octets = list(map(int, ip.split(".")))
if octets[0] == 0 or octets[0] == 255 or octets[3] == 0 or octets[3] == 255:
continue
filtered_ips.append(ip)
return filtered_ips
def find_ioc(text: str) -> CollectedData:
"""
Finds hashes, IPs and URLs in the text
Args:
text (str): Text to search for hashes, IPs and URLs
Returns:
CollectedData: CollectedData object with the found hashes, IPs and URLs
"""
collected_data = CollectedData()
sha256_hashes = findall(r"[a-f0-9]{64}", text)
sha1_hashes = filter_already_found_hashes(findall(r"[a-f0-9]{40}$", text), sha256_hashes)
md5_hashes = filter_already_found_hashes(findall(r"[a-f0-9]{32}", text), sha256_hashes + sha1_hashes)
collected_data.hashes = set(sha256_hashes + sha1_hashes + md5_hashes)
ipv4 = findall(r"[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}", text)
collected_data.ips = set(filter_invalid_ips(filter_local_ips(ipv4)))
urls = findall(
r"(http|https|ftp)\://([a-zA-Z0-9\-\.]+\.+[a-zA-Z]{2,3})(:[a-zA-Z0-9]*)?/?([a-zA-Z0-9\-\._\?\,\'/\\"
r"\+&amp;%\$#\=~]*)[^\.\,\)\(\s]?",
text)
for url in urls:
collected_data.urls.add(url[0] + "://" + url[1] + (f":{url[2]}" if url[2] != "" else "") + "/" + url[3])
return collected_data
import os
from parsers.text_parser import find_ioc
from parsers.pdf_parser import process_pdf
from requests import get
def test_no_collectable_data_in_text():
text = "This is a test text"
collected_data = find_ioc(text)
assert collected_data.hashes == set()
assert collected_data.ips == set()
assert collected_data.urls == set()
def test_collect_hashes():
text = "This is a test text with a hash 5d41402abc4b2a76b9719d911017c592"
collected_data = find_ioc(text)
assert collected_data.hashes == {"5d41402abc4b2a76b9719d911017c592"}
assert collected_data.ips == set()
assert collected_data.urls == set()
def test_collectable_ips():
text = "Some text with an IP. 102.123.253.123"
collected_data = find_ioc(text)
assert collected_data.hashes == set()
assert collected_data.ips == {"102.123.253.123"}
assert collected_data.urls == set()
def test_collectable_ips_local():
text = "Some text with an IP. But this one is local 127.0.0.1"
collected_data = find_ioc(text)
assert collected_data.hashes == set()
assert collected_data.ips == set()
assert collected_data.urls == set()
def test_collectable_urls():
text = "Some text with a URL. https://www.google.com"
collected_data = find_ioc(text)
assert collected_data.hashes == set()
assert collected_data.ips == set()
assert collected_data.urls == {"https://www.google.com/"}
def test_parse_pdf_no_data():
pdf_url = "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf"
pdf_path = "dummy.pdf"
with open(pdf_path, "wb") as f:
f.write(get(pdf_url).content)
collected_data = process_pdf(pdf_path)
assert collected_data.hashes == set()
assert collected_data.ips == set()
assert collected_data.urls == set()
os.remove(pdf_path)
def test_parse_pdf_with_data():
pdf_url = "https://storage.yandexcloud.net/ivanprogramming/Network_Report.pdf"
# This is my report for MTS Cybersecurity Challenge 2022, loaded to Yandex Cloud Storage (like AWS S3)
pdf_path = "Network_Report.pdf"
with open(pdf_path, "wb") as f:
f.write(get(pdf_url).content)
collected_data = process_pdf(pdf_path)
correct_hashes = {"8cf2cddda8522975a22da3da429339be471234eacc0e11c099d6dcb732cf3cbb",
"f1b789be1126b557240dd0dfe98fc5f3ad6341bb1a5d8be0a954f65b486ad32a",
"d43159c8bf2e1bd866abdbb1687911e2282b1f98a7c063f85ffd53a7f51efed4",
"38c6c5b8d6fa71d9856758a5c0c2ac9d0a0a1450f75bb1004dd988e23d73a312",
"4c957072ab097d3474039f432466cd251d1dc7d91559b76d4e5ead4a8bd499d5",
"3abae6dd2ddae23b2de2ccbcc160a4a5773bef8934d0e6896d50197c3d3c417f"}
for correct_hash in correct_hashes:
assert correct_hash in collected_data.hashes
correct_ips = {
"209.141.55.226",
"85.143.218.7",
"46.249.62.199",
"190.146.112.216",
"87.236.22.142"
}
for correct_ip in correct_ips:
assert correct_ip in collected_data.ips
assert "http://www.rootscafeslc.com/" in collected_data.urls
os.remove(pdf_path)