niku-client/app/data.py

265 lines
9.1 KiB
Python
Raw Permalink Normal View History

2023-08-01 20:44:17 +00:00
import os
import json
import time
import fcntl
import urllib
import pickle
import config
import requests
def prepare_paths(*paths):
for path in paths:
tail, _ = os.path.split(path)
if not os.path.exists(tail):
os.makedirs(tail)
base_dir = os.path.join(os.environ['HOME'], '.niku')
def rel(path):
return os.path.join(base_dir, path)
pcap_storage = rel('pcap/')
metadata_storage = rel('metadata/')
search_storage = rel('searches/')
search_num = rel('search_num')
prepare_paths(pcap_storage, metadata_storage, search_storage)
if not os.path.exists(search_num):
with open(search_num, 'wb') as fpo:
fpo.write('0')
def local_pcap_path(remote_pcap_path):
local = os.path.join(pcap_storage, remote_pcap_path)
if not os.path.isfile(local):
prepare_paths(local)
urllib.urlretrieve('http://%s:%s/pcap/%s' % (config.SERVER_IP, config.SERVER_PORT, remote_pcap_path), filename=local)
return local
def load_metadata(ids, reverse=False):
out = [None]*len(ids)
mapping = {x: i for i, x in enumerate(ids)}
to_search = []
for i, cid in enumerate(ids):
mpath = os.path.join(metadata_storage, str(cid))
if os.path.exists(mpath):
with open(mpath, 'rb') as fp:
out[i] = pickle.load(fp)
else:
to_search.append(cid)
if to_search:
for line in urllib.urlopen('http://%s:%s/api/metadata?%s' % (config.SERVER_IP, config.SERVER_PORT, json.dumps(to_search))):
mdata = json.loads(line)
idx = mapping[mdata['id']]
assert out[idx] is None
out[idx] = mdata
mpath = os.path.join(metadata_storage, str(mdata['id']))
with open(mpath, 'wb') as fp:
pickle.dump(mdata, fp)
assert all(x is not None for x in out)
prettify_metadata(out)
return out if not reverse else reversed(out)
def prettify_metadata(md_list):
# Get list of host mappings
show_hosts_url = 'http://' + config.SERVER_IP + ':' + str(config.SERVER_PORT) + '/api/hosts/get'
hosts = map(json.loads, requests.get(show_hosts_url).text.splitlines())
# Get list of services mappings
show_services_url = 'http://' + config.SERVER_IP + ':' + str(config.SERVER_PORT) + '/api/services/get'
services = map(json.loads, requests.get(show_services_url).text.splitlines())
# Rendering result before sending out
for result in md_list:
# Map unfriendly boot-time back to friendly host name
for host in hosts:
if host['boot_time'] == result['src_boot']:
result['src_boot_name'] = host['name']
break
else:
result['src_boot_name'] = str(result['src_boot'])
# Map unfriendly (dst,dst_port,protocol) back to friendly service name
dst = result['dst']
dst_port = result['dst_port']
protocol = result['protocol']
for service in services:
if service['host'] == dst and service['port'] == dst_port and service['protocol'] == protocol:
result['service'] = service['name']
break
else:
result['service'] = str(dst) + ':' + str(dst_port) + ':' + str(protocol)
# Convert unfriendly UNIX timestamp to human readable time coordinates
timestamp = result['timestamp']
result['timestamp_str'] = time.strftime("%D - %H:%M:%S", time.gmtime(timestamp + config.TIMEZONE_OFFSET)) if timestamp else None
# Convert duration to MM:SS
duration = float(result['duration'])
result['duration_str'] = '%02d' % (duration / 60) + ':' + '%02d' % (duration % 60)
result['local_filename'] = os.path.join(pcap_storage, result['filename'])
def alloc_search_num():
with open(search_num, 'r+') as fp:
fcntl.lockf(fp.fileno(), fcntl.LOCK_EX)
val_str = fp.read()
if not val_str:
val = 0
else:
val = int(val_str)
fp.seek(0)
fp.truncate()
fp.write(str(val + 1))
fcntl.lockf(fp.fileno(), fcntl.LOCK_UN)
return val
class Search(object):
def __init__(self, num, params, results, timestamp, sorting):
assert type(num) in (int, long)
assert type(params) is dict
assert type(results) in (list, tuple)
assert type(timestamp) in (int, long, float)
assert type(sorting) in (str, unicode)
self.num = num
self.params = params
self.results = results
self.num_results = len(self.results)
self.timestamp = float(timestamp)
self.sorting = sorting
@staticmethod
def _path(num, sorting):
return os.path.join(search_storage, '%s-%s' % (num, sorting))
def _save(self):
with open(self._path(self.num, self.sorting), 'wb') as fp:
fp.write('%r\n' % self.timestamp)
fp.write('%s\n' % json.dumps(self.params))
fp.write('%s\n' % self.num_results)
for cid in self.results:
fp.write('%s\n' % cid)
@classmethod
def load(cls, num, sorting, partial=False):
"""
Load the search with the given identifier from disk.
"""
path = cls._path(num, sorting)
if os.path.exists(path):
with open(Search._path(num, sorting), 'rb') as fp:
timestamp = float(fp.readline())
params = json.loads(fp.readline())
num_results = int(fp.readline())
results = []
if not partial:
for line in fp:
results.append(int(line))
out = cls(int(num), params, results, timestamp, sorting)
out.num_results = num_results
return out
elif sorting == 'timestamp':
raise ValueError('No such search %d' % num)
elif partial:
raise ValueError('..why')
else:
start = cls.load(num, 'timestamp')
return start.sort(sorting)
@classmethod
def search(cls, params):
"""
Perform a search. Will save to disk.
"""
assert 'order_by' not in params
new_num = alloc_search_num()
timestamp = time.time() - config.TIMEZONE_OFFSET
results = map(int, urllib.urlopen('http://%s:%s/api/search?%s' % (config.SERVER_IP, config.SERVER_PORT, json.dumps(params))))
out = cls(new_num, params, results, timestamp, 'timestamp')
out._save()
return out
@classmethod
def loadall(cls, partial=False):
"""
Load all cached searches. Pass partial=True to avoid loading all the ids
"""
all_ids = os.listdir(search_storage)
seen_ids = set()
for name in all_ids:
sid, sorting = name.split('-')
if sid not in seen_ids:
yield cls.load(sid, sorting, partial=partial)
seen_ids.add(sid)
def delete(self):
"""
Remove this search from the filesystem
"""
for name in os.listdir(search_storage):
if name.startswith('%d-' % self.num):
os.unlink(os.path.join(search_storage, name))
@property
def update_possible(self):
"""
Returns True if this search can be updated
"""
if 'timestamp' in self.params and self.params['timestamp'] is not None:
ts_end = self.params['timestmp'][1]
return ts_end > self.timestamp
else:
return True
def update(self):
"""
Call this to check for updates to the search. Will save to disk.
"""
if self.num_results != len(self.results):
raise ValueError("Partially loaded search cannot be updated")
new_params = dict(self.params)
if 'timestamp' in self.params:
new_params['timestamp'] = [self.timestamp, self.params['timestamp'][1]]
else:
new_params['timestamp'] = [self.timestamp, 2**32 - 1]
new_timestamp = time.time() - config.TIMEZONE_OFFSET
new_results = map(int, urllib.urlopen('http://%s:%s/api/search?%s' % (config.SERVER_IP, config.SERVER_PORT, json.dumps(new_params))))
self.results.extend(new_results)
self.timestamp = float(new_timestamp)
self.num_results = len(self.results)
self.delete()
self._save()
def sort(self, column):
if self.num_results < 1000:
params = json.dumps({"ids": self.results, "order_by": column})
if len(params) < 2000:
results = map(int, urllib.urlopen('http://%s:%s/api/sort?%s' % (config.SERVER_IP, config.SERVER_PORT, params)))
out = type(self)(self.num, self.params, results, self.timestamp, column)
out._save()
return out
params = dict(self.params)
params['order_by'] = column
timestamp = time.time() - config.TIMEZONE_OFFSET
results = map(int, urllib.urlopen('http://%s:%s/api/search?%s' % (config.SERVER_IP, config.SERVER_PORT, json.dumps(params))))
params.pop('order_by')
out = type(self)(self.num, params, results, timestamp, column)
out._save()
return out