Source code for haystack.search.api

# -*- coding: utf-8 -*-

from past.builtins import long
import logging
import pickle

import json

from haystack.search import searcher
from haystack.outputters import text
from haystack.outputters import python
from haystack import listmodel

log = logging.getLogger('api')


class HaystackError(Exception):
    pass


[docs]def search_record(memory_handler, record_type, search_constraints=None, extended_search=False): """ Search a record in the memory dump of a process represented by memory_handler. The record type must have been imported using haystack functions. If constraints exists, they will be considered during the search. :param memory_handler: IMemoryHandler :param record_type: a ctypes.Structure or ctypes.Union from a module imported by haystack :param search_constraints: IModuleConstraints to be considered during the search :param extended_search: boolean, use allocated chunks only per default (False) :rtype a list of (ctypes records, memory offset) """ if extended_search: my_searcher = searcher.AnyOffsetRecordSearcher(memory_handler, search_constraints) return my_searcher.search(record_type) my_searcher = searcher.RecordSearcher(memory_handler, search_constraints) return my_searcher.search(record_type)
[docs]def search_record_hint(memory_handler, record_type, hint, search_constraints=None, extended_search=False): """ Search a record in the memory dump of a process, but only on the memory page containing the hinted address. The record type must have been imported using haystack functions. If constraints exists, they will be considered during the search. :param memory_handler: IMemoryHandler :param record_type: a ctypes.Structure or ctypes.Union from a module imported by haystack :param search_constraints: IModuleConstraints to be considered during the search :param extended_search: boolean, use allocated chunks only per default (False) :rtype a list of (ctypes records, memory offset) """ hint_mapping = memory_handler.get_mapping_for_address(hint) if extended_search: my_searcher = searcher.AnyOffsetRecordSearcher(memory_handler, my_constraints=search_constraints, target_mappings=[hint_mapping]) return my_searcher.search(record_type) my_searcher = searcher.RecordSearcher(memory_handler, my_constraints=search_constraints, target_mappings=[hint_mapping]) return my_searcher.search(record_type)
# FIXME TODO change for results == ctypes
[docs]def output_to_string(memory_handler, results): """ Transform ctypes results in a string format :param memory_handler: IMemoryHandler :param results: results from the search_record :return: """ if not isinstance(results, list): raise TypeError('Feed me a list of results') parser = text.RecursiveTextOutputter(memory_handler) ret = '[' for ss, addr in results: ret += "# --------------- 0x%lx \n%s" % (addr, parser.parse(ss)) pass ret += ']' return ret
[docs]def output_to_python(memory_handler, results): """ Transform ctypes results in a non-ctypes python object format :param memory_handler: IMemoryHandler :param results: results from the search_record :return: """ if not isinstance(results, list): raise TypeError('Feed me a list of results') # also generate POPOs my_model = memory_handler.get_model() pythoned_modules = my_model.get_pythoned_modules().keys() for module_name, module in my_model.get_imported_modules().items(): if module_name not in pythoned_modules: my_model.build_python_class_clones(module) # parse and generate instances parser = python.PythonOutputter(memory_handler) ret = [(parser.parse(ss), addr) for ss, addr in results] # last check to clean the structure from any ctypes Structure if python.findCtypesInPyObj(memory_handler, ret): raise HaystackError( 'Bug in framework, some Ctypes are still in the return results. Please Report test unit.') return ret
[docs]def output_to_json(memory_handler, results): """ Transform ctypes results in a json format :param memory_handler: IMemoryHandler :param results: results from the search_record :return: """ if not isinstance(results, list): raise TypeError('Feed me a list of results') ret = output_to_python(memory_handler, results) # cirular refs kills it check_circular=False, return json.dumps(ret, default=python.json_encode_pyobj)
[docs]def output_to_pickle(memory_handler, results): """ Transform ctypes results in a pickled format. To load the pickled objects, you need to have haystack in your path. :param memory_handler: IMemoryHandler :param results: results from the search_record :return: """ if not isinstance(results, list): raise TypeError('Feed me a list of results') ret = output_to_python(memory_handler, results) return pickle.dumps(ret)
[docs]def load_record(memory_handler, struct_type, memory_address, load_constraints=None): """ Load a record from a specific address in memory. You could use that function to monitor a specific record from memory after a refresh. :param memory_handler: IMemoryHandler :param struct_type: a ctypes.Structure or ctypes.Union :param memory_address: long :param load_constraints: IModuleConstraints to be considered during loading :return: (ctypes record instance, validated_boolean) """ # FIXME, is number maybe ? if not isinstance(memory_address, long) and not isinstance(memory_address, int): raise TypeError('Feed me a long memory_address') # we need to give target_mappings so not to trigger a heap resolution my_loader = searcher.RecordLoader(memory_handler, load_constraints, target_mappings=memory_handler.get_mappings()) return my_loader.load(struct_type, memory_address)
[docs]def validate_record(memory_handler, instance, record_constraints=None, max_depth=10): """ Validate a loaded record against constraints. :param memory_handler: IMemoryHandler :param instance: a ctypes record :param record_constraints: IModuleConstraints to be considered during validation :return: """ validator = listmodel.ListModel(memory_handler, record_constraints) return validator.load_members(instance, max_depth)