S3TransferStreamProcessor

class openmsistream.S3TransferStreamProcessor(bucket_name, config_path, topic_name, **kwargs)

Bases: DataFileStreamProcessor

Reconstructs data files read as messages from a topic, holds them in memory or on disk, and transfers them to an S3 bucket when all of their messages have been received

Parameters:
  • bucket_name (str) – Name of the S3 bucket into which reconstructed files should be transferred

  • config_path (pathlib.Path) – Path to the config file to use in defining the Broker connection and Consumers

  • topic_name (str) – Name of the topic to which the Consumers should be subscribed

  • filepath_regex – If given, only messages associated with files whose paths match this regex will be consumed

make_stream()

Runs process_files_as_read() to reconstruct files in memory and transfer completed files to the S3 bucket. Runs until the user inputs a command to shut it down.

Returns:

the total number of messages consumed

Return type:

int

Returns:

the total number of messages processed (registered in memory)

Return type:

int

Returns:

the paths of files successfully transferred to the S3 bucket during the run

Return type:

list

classmethod get_command_line_arguments()

Anything extending this class should be able to access the “treat_undecryptable_as_plaintext” flag

classmethod get_init_args_kwargs(parsed_args)

Get the list of init arguments and the dictionary of init keyword arguments for this class given a namespace of, for example, parsed arguments.

Parameters:

parsed_args (argparse.Namespace) – A namespace containing entries needed to determine the init args and kwargs for this class

Returns:

A list of init args

Returns:

A dictionary of init kwargs

classmethod run_from_command_line(args=None)

Run a S3TransferStreamProcessor directly from the command line

Calls make_stream() on a S3TransferStreamProcessor defined by command line (or given) arguments

Parameters:

args (list, optional) – the list of arguments to send to the parser instead of getting them from sys.argv