DataFileUploadDirectory

class openmsistream.DataFileUploadDirectory(dirpath, config_path, upload_regex=re.compile('^((?!(\\.|.*.log))).*$'), datafile_type=<class 'openmsistream.data_file_io.entity.upload_data_file.UploadDataFile'>, watchdog_lag_time=3, use_polling_observer=False, **kwargs)

Bases: DataFileDirectory, ControlledProcessSingleThread, ProducerGroup, Runnable

Class representing a directory being watched for new files to be added. Files added to the directory are broken into chunks and uploaded as messages to a Kafka topic.

Parameters:
  • dirpath (pathlib.Path) – Path to the directory that should be watched for new files

  • config_path (pathlib.Path) – Path to the config file to use in defining the Broker connection and Producers

  • upload_regex (re.compile(), optional) – only files matching this regular expression will be uploaded

  • datafile_type (UploadDataFile, optional) – the type of data file that recognized files should be uploaded as (must be a subclass of UploadDataFile)

  • watchdog_lag_time (int) – Number of seconds that files must remain static (unmodified) for their uploads to begin

Raises:

ValueError – if datafile_type is not a subclass of UploadDataFile

ARGUMENT_PARSER_TYPE

alias of OpenMSIStreamArgumentParser

upload_files_as_added(topic_name, n_threads=2, chunk_size=524288, max_queue_size=500, upload_existing=False)

Watch for new files to be added to the directory. Chunk and produce newly added files as messages to a Kafka topic.

Parameters:
  • topic_name (str) – Name of the topic to produce messages to

  • n_threads (int, optional) – The number of threads to use to produce from the shared queue

  • chunk_size (int, optional) – The size of each file chunk in bytes

  • max_queue_size (int, optional) – The maximum size in MB of the internal upload queue (this is distinct from the librdkafka buffering queue)

  • upload_existing (bool, optional) – True if any files that already exist in the directory should be uploaded. If False (the default) then only files added to the directory after startup will be enqueued to the producer

producer_callback(err, msg, prodid, filename, filepath, n_total_chunks, chunk_i)

A reference to this method is given as the callback for each call to confluent_kafka.Producer.produce(). It is called for every message upon acknowledgement by the broker, and it uses the file registries in the LOGS subdirectory to keep the information about what has and hasn’t been uploaded current with what has been received by the broker.

If the message is the final one to be acknowledged from its corresponding UploadDataFile, the UploadDataFile is registered as “fully produced”.

Parameters:
  • err (confluent_kafka.KafkaError) – The error object for the message

  • msg (confluent_kafka.Message) – The message object

  • prodid (str) – The ID of the producer that produced the message (hex(id(producer)) in memory)

  • filename (str) – The name of the file the message is coming from

  • filepath (pathlib.Path) – The full path to the file the message is coming from

  • n_total_chunks (int) – The total number of chunks in the file this message came from

  • chunk_i (int) – The index of the message’s file chunk in the full list for the file

filepath_should_be_uploaded(filepath)

Returns True if a given filepath should be uploaded (if it matches the upload regex)

Parameters:

filepath (pathlib.Path) – A candidate upload filepath

Returns:

True if the filepath is an existing file outside of the LOGS directory whose path matches the upload regex, False otherwise

Return type:

bool

Raises:

TypeError – if filepath isn’t a pathlib.Path object

classmethod get_command_line_arguments()

Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

classmethod run_from_command_line(args=None)

Run a DataFileUploadDirectory directly from the command line

Calls upload_files_as_added() on a DataFileUploadDirectory defined by command line (or given) arguments

Parameters:

args (List) – the list of arguments to send to the parser

property other_datafile_kwargs

Overload this property to send extra keyword arguments to the UploadDataFile constructor for each recognized file (useful if using a custom datafile_type)

property status_msg

A message stating the number of files currently at each stage of progress, up to the maximum number of files

property progress_msg

A message describing the files that are currently recognized as part of the directory

property have_file_to_upload

True if there are any files waiting to be uploaded or in progress

property partially_done_file_paths

A list of filepaths whose uploads are currently in progress

property n_partially_done_files

The number of files currently in the process of being uploaded