DataFileStreamProcessor

class openmsistream.DataFileStreamProcessor(config_file, topic_name, **kwargs)

Bases: DataFileStreamHandler, DataFileChunkProcessor, ABC

Consumes DataFileChunk messages into memory and perform some operation(s) when entire files are available. This is a base class that cannot be instantiated on its own.

Parameters:
  • config_file (pathlib.Path) – Path to the config file to use in defining the Broker connection and Consumers

  • topic_name (str) – Name of the topic to which the Consumers should be subscribed

  • output_dir (pathlib.Path, optional) – Path to the directory where the log and csv registry files should be kept (if None a default will be created in the current directory)

  • mode (str, optional) – a string flag determining whether reconstructed data files should have their contents stored only in “memory” (the default, and the fastest), only on “disk” (in the output directory, to reduce the memory footprint), or “both” (for flexibility in processing)

  • datafile_type (DownloadDataFile, optional) – the type of data file that recognized files should be reconstructed as. Default options are set automatically depending on the “mode” argument. (must be a subclass of DownloadDataFile)

  • n_threads (int, optional) – the number of threads/consumers to run

  • consumer_group_id (str, optional) – the group ID under which each consumer should be created

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

Raises:

ValueError – if datafile_type is not a subclass of DownloadDataFileToMemory, or more specific as determined by the “mode” argument

process_files_as_read()

Consumes messages and stores their data in memory. Uses several parallel threads to consume message and calls _process_downloaded_data_file() for fully read files. Runs until the user inputs a command to shut it down.

Returns:

the total number of messages consumed

Return type:

int

Returns:

the total number of messages processed (registered in memory)

Return type:

int

Returns:

the number of files successfully processed during the run

Return type:

int

Returns:

the paths of up to 50 most recent files successfully processed during the run

Return type:

List

_process_message(lock, msg, rootdir_to_set=None)

Process a single message to add it to a file being held in memory until all messages are received.

If the message failed to be decrypted, this method calls _undecryptable_message_callback() and returns.

If the message is the first one consumed for a particular file, or any message other than the last one needed, it registers the file as ‘in_progress’ in the .csv file.

If the message is the last message needed for a file and its contents match the original hash of the file on disk, this method calls _process_downloaded_data_file().

If the call to _process_downloaded_data_file() returns None (success), this method moves the file to the ‘successfully processed’ .csv file, and returns.

If the call to _process_downloaded_data_file() returns an Exception, this method calls _failed_processing_callback(), registers the file as ‘failed’ in the .csv file, and returns.

If the message is the last one needed but the contents are somehow different than the original file on disk, this method calls _mismatched_hash_callback(), registers the file as ‘mismatched_hash’ in the .csv file, and returns.

Parameters:
  • lock (threading.Lock) – Acquiring this threading.Lock object ensures that only one instance of _process_message() is running at once

  • msg (confluent_kafka.KafkaMessage or kafkacrypto.Message) – The received confluent_kafka.KafkaMessage object, or an undecrypted KafkaCrypto message

  • rootdir_to_set (pathlib.Path) – Path to a directory that should be set as the “root” for reconstructed data files (default is the output directory)

Returns:

True if processing the message was successful (file in progress or successfully processed), False otherwise

Return type:

bool

abstract _process_downloaded_data_file(datafile, lock)

Perform some arbitrary operation(s) on a given data file that has been fully read from the stream. Can optionally lock other threads using the given lock.

Not implemented in the base class.

Parameters:
Returns:

None if processing was successful, an Exception otherwise

_failed_processing_callback(datafile, lock)

Called when _process_downloaded_data_file() returns an Exception, providing an opportunity for fallback/backup processing in the case of failure.

Does nothing in the base class.

Parameters:
classmethod get_command_line_arguments()

Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag

ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

_mismatched_hash_callback(datafile, lock)

Called when a file reconstructed in memory doesn’t match the hash of its contents originally on disk, providing an opportunity for fallback/backup processing in the case of failure.

Does nothing in the base class.

Parameters:
_undecryptable_message_callback(msg)

This function is called when a message that could not be decrypted is found. If this function is called it is likely that the file the chunk is coming from won’t be able to be processed.

In the base class, this logs a warning.

Parameters:

msg (kafkacrypto.Message) – the kafkacrypto.Message object with undecrypted :class:`kafkacrypto.KafkaCryptoMessage`s for its key and/or value

close()

Wrapper around kafkacrypto.KafkaCrypto.close().

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

property other_datafile_kwargs

Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor

abstract classmethod run_from_command_line(args: List[str] | None = None) None

Child classes should implement this function to do whatever it is they do when they run from the command line

Parameters:

args (list) – the list of arguments to send to the parser instead of getting them from sys.argv