DataFileStreamReproducer

A DataFileStreamReproducer is a useful base class that can be extended to run arbitrary Python code that computes some sort of “result message” on whole data files as they’re reconstructed from messages on a Kafka topic, and then produce those “result messages” to a different Kafka topic.

class openmsistream.DataFileStreamReproducer(config_file, consumer_topic_name, producer_topic_name, **kwargs)

Bases: DataFileStreamHandler, DataFileChunkReproducer, ABC

Consumes DataFileChunk messages into memory, compute some processing result when entire files are available, and produce that result to a different topic.

This is a base class that cannot be instantiated on its own.

Parameters:
  • config_file (pathlib.Path) – Path to the config file to use in defining the Broker connection, Consumers, and Producers

  • consumer_topic_name (str) – Name of the topic to which the Consumers should be subscribed

  • producer_topic_name (str) – Name of the topic to which the Producer should produce the processing results

  • output_dir (pathlib.Path, optional) – Path to the directory where the log and csv registry files should be kept (if None a default will be created in the current directory)

  • mode (str, optional) – a string flag determining whether reconstructed data files should have their contents stored only in “memory” (the default, and the fastest), only on “disk” (in the output directory, to reduce the memory footprint), or “both” (for flexibility in processing)

  • datafile_type (DownloadDataFile, optional) – the type of data file that recognized files should be reconstructed as. Default options are set automatically depending on the “mode” argument. (must be a subclass of DownloadDataFile)

  • n_producer_threads (int, optional) – the number of producers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).

  • n_consumer_threads (int, optional) – the number of consumers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).

  • consumer_group_id (str, optional) – the group ID under which each consumer should be created

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

Raises:

ValueError – if datafile_type is not a subclass of DownloadDataFileToMemory, or more specific as determined by the “mode” argument

produce_processing_results_for_files_as_read()

Consumes messages in several parallel threads and stores their data in memory. Calls _get_processing_result_message_for_file() for fully-read files to get their processing result messages. Produces processing result messages in several parallel threads as they’re generated. Runs until the user inputs a command to shut it down.

Returns:

the total number of messages consumed

Return type:

int

Returns:

the total number of messages processed (registered in memory)

Return type:

int

Returns:

the number of files reconstructed from the topic

Return type:

int

Returns:

the number of files whose processing results were successfully produced to the Producer topic

Return type:

int

Returns:

the paths of up to 50 most recent files whose processing results were successfully produced to the Producer topic during the run

Return type:

list

producer_callback(err, msg, prodid, filename, rel_filepath, n_total_chunks)

A reference to this method is given as the callback for each call to confluent_kafka.Producer.produce(). It is called for every message upon acknowledgement by the broker, and it uses the file registries in the LOGS subdirectory to keep the information about what has and hasn’t been uploaded current with what has been received by the broker.

Messages associated with an error from the broker will be recomputed and added back to the queue to be produced again, logging an error and registering the file as failed if the message can’t be computed from the datafile.

Messages that are successfully produced will move their associated data files to the “results_produced” csv file.

Parameters:
  • err (confluent_kafka.KafkaError) – The error object for the message

  • msg (confluent_kafka.Message) – The message object

  • prodid (str) – The ID of the producer that produced the message (hex(id(producer)) in memory)

  • filename (str) – The name of the file that was used to create this processing result message

  • rel_filepath (pathlib.Path) – The path to the file that was used to create this processing result message, relative to the DataFileChunk’s root directory

  • n_total_chunks (int) – The total number of chunks in the file used to create this processing result message

_process_message(lock, msg, rootdir_to_set=None)

Process a single message to add it to a file being held in memory until all messages are received.

If the message failed to be decrypted, this method calls _undecryptable_message_callback() and returns.

If the message is the first one consumed for a particular file, or any message other than the last one needed, it registers the file as ‘in_progress’ in the .csv file.

If the message is the last message needed for a file and its contents match the original hash of the file on disk, this method calls _get_processing_result_message_for_file() and enqueues the result to be produced. The file is moved to the ‘results_produced’ .csv file when the produced message is acknowledged by the broker through producer_callback().

If the call to _get_processing_result_message_for_file() returns None, this method calls _failed_computing_processing_result() and returns.

If the message is the last one needed but the contents are somehow different than the original file on disk, this method calls _mismatched_hash_callback(), registers the file as ‘mismatched_hash’ in the .csv file, and returns.

Parameters:
  • lock (threading.Lock) – Acquiring this threading.Lock object ensures that only one instance of _process_message() is running at once

  • msg (confluent_kafka.KafkaMessage or kafkacrypto.Message) – The received confluent_kafka.KafkaMessage object, or an undecrypted KafkaCrypto message

  • rootdir_to_set (pathlib.Path) – Path to a directory that should be set as the “root” for reconstructed data files (default is the output directory)

Returns:

True if processing the message was successful (file in progress or message enqueued to be produced), False otherwise

Return type:

bool

abstract _get_processing_result_message_for_file(datafile, lock)

Given a relative DownloadDataFileToMemory, compute and return a ReproducerMessage object that should be produced as the processing result for the file.

This function should log an error and return None if the processing result fails to be computed.

Not implemented in the base class.

Parameters:
Returns:

message object to be produced (or None if computing it failed for any reason)

Return type:

Producible

_failed_computing_processing_result(datafile, lock)

This function is called when _get_processing_result_message_for_file() returns None because a processing result message could not be computed. It registers the original data file read from the topic as ‘failed’ in the .csv file so that that file will have its messages re-consumed if the program is restarted reading from the same topic with the same Consumer group ID. It then logs a warning and stops tracking the file.

Parameters:
ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

_mismatched_hash_callback(datafile, lock)

Called when a file reconstructed in memory doesn’t match the hash of its contents originally on disk, providing an opportunity for fallback/backup processing in the case of failure.

Does nothing in the base class.

Parameters:
_undecryptable_message_callback(msg)

This function is called when a message that could not be decrypted is found. If this function is called it is likely that the file the chunk is coming from won’t be able to be processed.

In the base class, this logs a warning.

Parameters:

msg (kafkacrypto.Message) – the kafkacrypto.Message object with undecrypted :class:`kafkacrypto.KafkaCryptoMessage`s for its key and/or value

close()

Wrapper around kafkacrypto.KafkaCrypto.close().

property consumer_topic_name

Name of the topic to which the consumers are subscribed

classmethod get_command_line_arguments()

Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

property other_datafile_kwargs

Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor

abstract classmethod run_from_command_line(args: List[str] | None = None) None

Child classes should implement this function to do whatever it is they do when they run from the command line

Parameters:

args (list) – the list of arguments to send to the parser instead of getting them from sys.argv