class documentation

class PartitionedDataSet(AbstractDataSet): (source)

Known subclasses: kedro.io.IncrementalDataSet

View In Hierarchy

PartitionedDataSet loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec.

It also supports advanced features like lazy saving.

Example usage for the YAML API:

station_data:
  type: PartitionedDataSet
  path: data/03_primary/station_data
  dataset:
    type: pandas.CSVDataSet
    load_args:
      sep: '\t'
    save_args:
      sep: '\t'
      index: true
  filename_suffix: '.dat'

Example usage for the Python API:

>>> import pandas as pd
>>> from kedro.io import PartitionedDataSet
>>>
>>> # Create a fake pandas dataframe with 10 rows of data
>>> df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)])
>>>
>>> # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key
>>> dict_df = {
        day_of_month: df[df["DAY_OF_MONTH"] == day_of_month]
        for day_of_month in df["DAY_OF_MONTH"]
    }
>>>
>>> # Save it as small paritions with DAY_OF_MONTH as the partition key
>>> data_set = PartitionedDataSet(
        path="df_with_partition",
        dataset="pandas.CSVDataSet",
        filename_suffix=".csv"
    )
>>> # This will create a folder `df_with_partition` and save multiple files
>>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc.
>>> data_set.save(dict_df)
>>>
>>> # This will create lazy load functions instead of loading data into memory immediately.
>>> loaded = data_set.load()
>>>
>>> # Load all the partitions
>>> for partition_id, partition_load_func in loaded.items():
        # The actual function that loads the data
        partition_data = partition_load_func()
>>>
>>> # Add the processing logic for individual partition HERE
>>> print(partition_data)

You can also load multiple partitions from a remote storage and combine them like this:

>>> import pandas as pd
>>> from kedro.io import PartitionedDataSet
>>>
>>> # these credentials will be passed to both 'fsspec.filesystem()' call
>>> # and the dataset initializer
>>> credentials = {"key1": "secret1", "key2": "secret2"}
>>>
>>> data_set = PartitionedDataSet(
        path="s3://bucket-name/path/to/folder",
        dataset="pandas.CSVDataSet",
        credentials=credentials
    )
>>> loaded = data_set.load()
>>> # assert isinstance(loaded, dict)
>>>
>>> combine_all = pd.DataFrame()
>>>
>>> for partition_id, partition_load_func in loaded.items():
        partition_data = partition_load_func()
        combine_all = pd.concat(
            [combine_all, partition_data], ignore_index=True, sort=True
        )
>>>
>>> new_data = pd.DataFrame({"new": [1, 2]})
>>> # creates "s3://bucket-name/path/to/folder/new/partition.csv"
>>> data_set.save({"new/partition.csv": new_data})
Method __init__ Creates a new instance of PartitionedDataSet.
Method _describe Undocumented
Method _exists Undocumented
Method _invalidate_caches Undocumented
Method _join_protocol Undocumented
Method _list_partitions Undocumented
Method _load Undocumented
Method _partition_to_path Undocumented
Method _path_to_partition Undocumented
Method _release Undocumented
Method _save Undocumented
Instance Variable _credentials Undocumented
Instance Variable _dataset_config Undocumented
Instance Variable _dataset_type Undocumented
Instance Variable _filename_suffix Undocumented
Instance Variable _filepath_arg Undocumented
Instance Variable _fs_args Undocumented
Instance Variable _load_args Undocumented
Instance Variable _overwrite Undocumented
Instance Variable _partition_cache Undocumented
Instance Variable _path Undocumented
Instance Variable _protocol Undocumented
Instance Variable _sep Undocumented
Property _filesystem Undocumented
Property _normalized_path Undocumented

Inherited from AbstractDataSet:

Class Method from_config Create a data set instance using the configuration provided.
Method __str__ Undocumented
Method exists Checks whether a data set's output already exists by calling the provided _exists() method.
Method load Loads data by delegation to the provided load method.
Method release Release any cached data.
Method save Saves data by delegation to the provided save method.
Method _copy Undocumented
Property _logger Undocumented
def __init__(self, path: str, dataset: Union[str, Type[AbstractDataSet], Dict[str, Any]], filepath_arg: str = 'filepath', filename_suffix: str = '', credentials: Dict[str, Any] = None, load_args: Dict[str, Any] = None, fs_args: Dict[str, Any] = None, overwrite: bool = False): (source)

Creates a new instance of PartitionedDataSet.

Parameters
path:strPath to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataSet.
dataset:Union[str, Type[AbstractDataSet], Dict[str, Any]]Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataSet b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.
filepath_arg:strUnderlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath".
filename_suffix:strIf specified, only partitions that end with this string will be processed.
credentials:Dict[str, Any]Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-credentials
load_args:Dict[str, Any]Keyword arguments to be passed into find() method of the filesystem implementation.
fs_args:Dict[str, Any]Extra arguments to pass into underlying filesystem class constructor (e.g. {"project": "my-project"} for GCSFileSystem)
overwrite:boolIf True, any existing partitions will be removed.
Raises
DataSetErrorIf versioning is enabled for the underlying dataset.
def _describe(self) -> Dict[str, Any]: (source)

Undocumented

def _exists(self) -> bool: (source)

Undocumented

def _invalidate_caches(self): (source)

Undocumented

def _join_protocol(self, path: str) -> str: (source)

Undocumented

@cachedmethod(cache=operator.attrgetter('_partition_cache'))
def _list_partitions(self) -> List[str]: (source)

Undocumented

def _load(self) -> Dict[str, Callable[[], Any]]: (source)

Undocumented

def _partition_to_path(self, path: str): (source)

Undocumented

def _path_to_partition(self, path: str) -> str: (source)

Undocumented

def _release(self): (source)

Undocumented

def _save(self, data: Dict[str, Any]): (source)

Undocumented

_credentials = (source)

Undocumented

_dataset_config = (source)

Undocumented

_dataset_type = (source)

Undocumented

_filename_suffix = (source)

Undocumented

_filepath_arg = (source)

Undocumented

_fs_args = (source)

Undocumented

_load_args = (source)

Undocumented

_overwrite = (source)

Undocumented

_partition_cache: Cache = (source)

Undocumented

Undocumented

_protocol = (source)

Undocumented

Undocumented

@property
_filesystem = (source)

Undocumented

@property
_normalized_path: str = (source)

Undocumented