DataFileStreamReproducer
A DataFileStreamReproducer is a useful base class that can be extended to run arbitrary Python code that computes some sort of “result message” on whole data files as they’re reconstructed from messages on a Kafka topic, and then produce those “result messages” to a different Kafka topic.
- class openmsistream.DataFileStreamReproducer(config_file, consumer_topic_name, producer_topic_name, **kwargs)
Bases:
DataFileStreamHandler
,DataFileChunkReproducer
,ABC
Consumes
DataFileChunk
messages into memory, compute some processing result when entire files are available, and produce that result to a different topic.This is a base class that cannot be instantiated on its own.
- Parameters:
config_file (
pathlib.Path
) – Path to the config file to use in defining the Broker connection, Consumers, and Producersconsumer_topic_name (str) – Name of the topic to which the Consumers should be subscribed
producer_topic_name (str) – Name of the topic to which the Producer should produce the processing results
output_dir (
pathlib.Path
, optional) – Path to the directory where the log and csv registry files should be kept (if None a default will be created in the current directory)mode (str, optional) – a string flag determining whether reconstructed data files should have their contents stored only in “memory” (the default, and the fastest), only on “disk” (in the output directory, to reduce the memory footprint), or “both” (for flexibility in processing)
datafile_type (
DownloadDataFile
, optional) – the type of data file that recognized files should be reconstructed as. Default options are set automatically depending on the “mode” argument. (must be a subclass ofDownloadDataFile
)n_producer_threads (int, optional) – the number of producers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).
n_consumer_threads (int, optional) – the number of consumers to run. The total number of producer/consumer threads started is max(n_consumer_threads,n_producer_threads).
consumer_group_id (str, optional) – the group ID under which each consumer should be created
filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed
- Raises:
ValueError – if datafile_type is not a subclass of
DownloadDataFileToMemory
, or more specific as determined by the “mode” argument
- produce_processing_results_for_files_as_read()
Consumes messages in several parallel threads and stores their data in memory. Calls
_get_processing_result_message_for_file()
for fully-read files to get their processing result messages. Produces processing result messages in several parallel threads as they’re generated. Runs until the user inputs a command to shut it down.- Returns:
the total number of messages consumed
- Return type:
- Returns:
the total number of messages processed (registered in memory)
- Return type:
- Returns:
the number of files reconstructed from the topic
- Return type:
- Returns:
the number of files whose processing results were successfully produced to the Producer topic
- Return type:
- Returns:
the paths of up to 50 most recent files whose processing results were successfully produced to the Producer topic during the run
- Return type:
- producer_callback(err, msg, prodid, filename, rel_filepath, n_total_chunks)
A reference to this method is given as the callback for each call to
confluent_kafka.Producer.produce()
. It is called for every message upon acknowledgement by the broker, and it uses the file registries in the LOGS subdirectory to keep the information about what has and hasn’t been uploaded current with what has been received by the broker.Messages associated with an error from the broker will be recomputed and added back to the queue to be produced again, logging an error and registering the file as failed if the message can’t be computed from the datafile.
Messages that are successfully produced will move their associated data files to the “results_produced” csv file.
- Parameters:
err (
confluent_kafka.KafkaError
) – The error object for the messagemsg (
confluent_kafka.Message
) – The message objectprodid (str) – The ID of the producer that produced the message (hex(id(producer)) in memory)
filename (str) – The name of the file that was used to create this processing result message
rel_filepath (
pathlib.Path
) – The path to the file that was used to create this processing result message, relative to theDataFileChunk
’s root directoryn_total_chunks (int) – The total number of chunks in the file used to create this processing result message
- _process_message(lock, msg, rootdir_to_set=None)
Process a single message to add it to a file being held in memory until all messages are received.
If the message failed to be decrypted, this method calls
_undecryptable_message_callback()
and returns.If the message is the first one consumed for a particular file, or any message other than the last one needed, it registers the file as ‘in_progress’ in the .csv file.
If the message is the last message needed for a file and its contents match the original hash of the file on disk, this method calls
_get_processing_result_message_for_file()
and enqueues the result to be produced. The file is moved to the ‘results_produced’ .csv file when the produced message is acknowledged by the broker throughproducer_callback()
.If the call to
_get_processing_result_message_for_file()
returns None, this method calls_failed_computing_processing_result()
and returns.If the message is the last one needed but the contents are somehow different than the original file on disk, this method calls
_mismatched_hash_callback()
, registers the file as ‘mismatched_hash’ in the .csv file, and returns.- Parameters:
lock (
threading.Lock
) – Acquiring thisthreading.Lock
object ensures that only one instance of_process_message()
is running at oncemsg (
confluent_kafka.KafkaMessage
orkafkacrypto.Message
) – The receivedconfluent_kafka.KafkaMessage
object, or an undecrypted KafkaCrypto messagerootdir_to_set (
pathlib.Path
) – Path to a directory that should be set as the “root” for reconstructed data files (default is the output directory)
- Returns:
True if processing the message was successful (file in progress or message enqueued to be produced), False otherwise
- Return type:
- abstract _get_processing_result_message_for_file(datafile, lock)
Given a relative
DownloadDataFileToMemory
, compute and return aReproducerMessage
object that should be produced as the processing result for the file.This function should log an error and return None if the processing result fails to be computed.
Not implemented in the base class.
- Parameters:
datafile (
DownloadDataFileToMemory
) – ADownloadDataFileToMemory
object that has received all of its messages from the topiclock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_get_processing_result_message_for_file()
is running at once
- Returns:
message object to be produced (or None if computing it failed for any reason)
- Return type:
- _failed_computing_processing_result(datafile, lock)
This function is called when
_get_processing_result_message_for_file()
returns None because a processing result message could not be computed. It registers the original data file read from the topic as ‘failed’ in the .csv file so that that file will have its messages re-consumed if the program is restarted reading from the same topic with the same Consumer group ID. It then logs a warning and stops tracking the file.- Parameters:
datafile (
DownloadDataFileToMemory
) – The datafile that should have been used to compute a processing result messagelock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_get_processing_result_message_for_file()
is running at once
- ARGUMENT_PARSER_TYPE
alias of
OpenMSIStreamArgumentParser
- _mismatched_hash_callback(datafile, lock)
Called when a file reconstructed in memory doesn’t match the hash of its contents originally on disk, providing an opportunity for fallback/backup processing in the case of failure.
Does nothing in the base class.
- Parameters:
datafile (
DownloadDataFileToMemory
) – ADownloadDataFileToMemory
object that has received all of its messages from the topiclock (
threading.Lock
) – Acquiring thisthreading.Lock
object would ensure that only one instance of_mismatched_hash_callback()
is running at once
- _undecryptable_message_callback(msg)
This function is called when a message that could not be decrypted is found. If this function is called it is likely that the file the chunk is coming from won’t be able to be processed.
In the base class, this logs a warning.
- Parameters:
msg (
kafkacrypto.Message
) – thekafkacrypto.Message
object with undecrypted :class:`kafkacrypto.KafkaCryptoMessage`s for its key and/or value
- close()
Wrapper around
kafkacrypto.KafkaCrypto.close()
.
- property consumer_topic_name
Name of the topic to which the consumers are subscribed
- classmethod get_command_line_arguments()
Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag
- classmethod get_init_args_kwargs(parsed_args)
Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.
- Parameters:
parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class
- Returns:
A list of init args
- Returns:
A dictionary of init kwargs
- property other_datafile_kwargs
Overload this in child classes to define additional keyword arguments that should go to the specific datafile constructor