Run Commands

Pps

series

Autoscaling PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "autoscaling": false,
    ...
}

Behavior #

The autoscaling attribute in a Pachyderm Pipeline Spec is used to specify whether the pipeline should automatically scale up or down based on the processing load.

If the autoscaling attribute is set to true, Pachyderm will monitor the processing load of the pipeline, and automatically scale up or down the number of worker nodes as needed to keep up with the demand. This can help to ensure that the pipeline is always running at optimal efficiency, without wasting resources when the load is low.

  • autoscaling is set to false by default.
  • The maximum number of workers is controlled by the parallelismSpec.
  • A pipeline with no outstanding jobs will go into standby. A pipeline in a standby state consumes no resources.

When to Use #

You should consider using the autoscaling attribute in a Pachyderm Pipeline Spec when you have a workload that has variable processing requirements or when the processing load of your pipeline is difficult to predict.

Example scenarios:

  • Processing unpredictable workloads: If you have a workload that has variable processing requirements, it can be difficult to predict the number of worker nodes that will be needed to keep up with the demand. In this case, you could use the autoscaling attribute to automatically scale the number of worker nodes up or down based on the processing load.

  • Processing large datasets: If you have a pipeline that is processing a large dataset, it can be difficult to predict the processing requirements for the pipeline. In this case, you could use the autoscaling attribute to automatically scale the number of worker nodes based on the processing load, in order to keep up with the demand.

  • Handling bursty workloads: If you have a workload that has periods of high demand followed by periods of low demand, it can be difficult to predict the processing requirements for the pipeline. In this case, you could use the autoscaling attribute to automatically scale the number of worker nodes up or down based on the processing load, in order to handle the bursty demand.

Datum Set Spec PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "datumSetSpec": {
        "number": 0,
        "sizeBytes": 0,
        "perWorker": 0,
    },
    ...
}

Attributes #

AttributeDescription
numberThe desired number of datums in each datum set. If specified, each datum set will contain the specified number of datums. If the total number of input datums is not evenly divisible by the number of datums per set, the last datum set may contain fewer datums than the others.
sizeBytesThe desired target size of each datum set in bytes. If specified, Pachyderm will attempt to create datum sets with the specified size, though the actual size may vary due to the size of the input files.
perWorkerThe desired number of datum sets that each worker should process at a time. This field is similar to number, but specifies the number of sets per worker instead of the number of datums per set.

Behavior #

The datumSetSpec attribute in a Pachyderm Pipeline Spec is used to control how the input data is partitioned into individual datum sets for processing. Datum sets are the unit of work that workers claim, and each worker can claim 1 or more datums. Once done processing, it commits a full datum set.

  • number if nonzero, specifies that each datum set should contain number datums. Sets may contain fewer if the total number of datums don’t divide evenly. If you lower the number to 1 it’ll update after every datum,the cost is extra load on etcd which can slow other stuff down. Default is 0.

  • sizeBytes , if nonzero, specifies a target size for each set of datums. Sets may be larger or smaller than sizeBytes, but will usually be pretty close to sizeBytes in size. Default is 0.

  • perWorker, if nonzero, specifies how many datum sets should be created for each worker. It can’t be set with number or sizeBytes. Default is 0.

When to Use #

You should consider using the datumSetSpec attribute in your Pachyderm pipeline when you are experiencing stragglers, which are situations where most of the workers are idle but a few are still processing jobs. This can happen when the work is not divided up in a balanced way, which can cause some workers to be overloaded with work while others are idle.

Datum Timeout PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "datumTimeout": string,
    ...
}

Behavior #

The datumTimeout attribute in a Pachyderm pipeline is used to specify the maximum amount of time that a worker is allowed to process a single datum in the pipeline.

When a worker begins processing a datum, Pachyderm starts a timer that tracks the elapsed time since the datum was first assigned to the worker. If the worker has not finished processing the datum before the datumTimeout period has elapsed, Pachyderm will automatically mark the datum as failed and reassign it to another worker to retry. This helps to ensure that slow or problematic datums do not hold up the processing of the entire pipeline.

Other considerations:

  • Not set by default, allowing a datum to process for as long as needed.
  • Takes precedence over the parallelism or number of datums; no single datum is allowed to exceed this value.
  • The value must be a string that represents a time value, such as 1s, 5m, or 15h.

When to Use #

You should consider using the datumTimeout attribute in your Pachyderm pipeline when you are processing large or complex datums that may take a long time to process, and you want to avoid having individual datums hold up the processing of the entire pipeline.

For example, if you are processing images or videos that are particularly large, or if your pipeline is doing complex machine learning or deep learning operations that can take a long time to run on individual datums, setting a reasonable datumTimeout can help ensure that your pipeline continues to make progress even if some datums are slow or problematic.

Datum Tries PPS

Spec #

This is a top-level attribute of the pipeline spec.


"datumTries": int,

Behavior #

The datumTries attribute in a Pachyderm pipeline specifies the maximum number of times that Pachyderm will try to process a datum. When a datum fails to process, either because of an error in the processing logic or because it exceeds the datumTimeout value, Pachyderm will automatically retry the datum until it is successful or the number of datumTries has been reached.

Each retry of a datum is treated as a new attempt, and the datum is added back to the job queue for processing. The retry process is transparent to the user and happens automatically within the Pachyderm system.

Other considerations:

  • datumTries is set to 3 by default if unspecified.
  • Setting to 1 attempts a datum once with no retries.
  • If all tries have been exhausted and processing has not succeeded, the datum is marked as Failed.

When to Use #

You should consider setting a higher datumTries count if your pipeline has a large number of datums that are prone to errors or timeouts, or if the datums you are working with have to be imported or fetched (via data ingress) from an external source.

Description PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "description": string,
    ...
}

Behavior #

description is displayed in your pipeline details when viewed from pachCTL or console.

When to Use #

It’s recommended to always provide meaningful descriptions to your Pachyderm resources.

Egress PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "egress": {
        // Egress to an object store
        "URL": "s3://bucket/dir"
        // Egress to a database
        "sqlDatabase": {
            "url": string,
            "fileFormat": {
                "type": string,
                "columns": [string]
            },
            "secret": {
                "name": string,
                "key": "PACHYDERM_SQL_PASSWORD"
            }
        },
    },
    ...
}

Attributes #

AttributeDescription
URLThe URL of the object store where the pipeline’s output data should be written.
sqlDatabaseAn optional field that is used to specify how the pipeline should write output data to a SQL database.
urlThe URL of the SQL database, in the format postgresql://user:password@host:port/database.
fileFormatThe file format of the output data, which can be specified as csv or tsv. This field also includes the column names that should be included in the output.
secretThe name and key of the Kubernetes secret that contains the password for the SQL database.

Behavior #

The egress field in a Pachyderm Pipeline Spec is used to specify how the pipeline should write the output data. The egress field supports two types of outputs: writing to an object store and writing to a SQL database.

Data is pushed after the user code finishes running but before the job is marked as successful. For more information, see Egress Data to an object store or Egress Data to a database.

This is required if the pipeline needs to write output data to an external storage system.

When to Use #

You should use the egress field in a Pachyderm Pipeline Spec when you need to write the output data from your pipeline to an external storage system, such as an object store or a SQL database.

Example scenarios:

  • Long-term data storage: If you need to store the output data from your pipeline for a long time, you can use the egress field to write the data to an object store, such as Amazon S3 or Google Cloud Storage.

  • Data sharing: If you need to share the output data from your pipeline with external users or systems, you can use the egress field to write the data to an object store that is accessible to those users or systems.

  • Analytics and reporting: If you need to perform further analytics or reporting on the output data from your pipeline, you can use the egress field to write the data to a SQL database that can be used for those purposes.

Input Cron PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "cron": {
      {
          "name": string,
          "spec": string,
          "repo": string,
          "start": time,
          "overwrite": bool
      }
    }
  },
  ...
}

Attributes #

AttributeRequired?Description
nameYesThe name of the cron job, which should be unique within the Pachyderm cluster.
specYesThe cron schedule for the job, specified using the standard cron format or macros. See schedule macros for examples. Pachyderm also supports non-standard schedules, such as "@daily".
repoNoThe name of the input repository that the cron job should read data from; default:<pipeline-name>_<input-name>
startNoSpecifies the start time for the cron job. This is useful for running the job on a specific date in the future. If not specified, starts immediately. Specifying a time enables you to run on matching times from the past or skip times from the present and only start running on matching times in the future. Format the time value according to RFC3339.
overwriteNoDefines whether you want the timestamp file to be overwritten on each tick; defaults to simply writing new files on each tick. By default, when "overwrite" is disabled, ticks accumulate in the cron input repo. When "overwrite" is enabled, Pachyderm erases the old ticks and adds new ticks with each commit. If you do not add any manual ticks or run pachctl run cron, only one tick file per commit (for the latest tick) is added to the input repo.

Behavior #

The input field in a Pachyderm Pipeline Spec is used to specify the inputs to the pipeline, which are the Pachyderm repositories that the pipeline should read data from. The input field can include both static and dynamic inputs.

The cron field within the input field is used to specify a dynamic input that is based on a cron schedule. This is useful for pipelines that need to process data on a regular schedule, such as daily or hourly.

A repo is created for each cron input. When a Cron input triggers, pachd commits a single file, named by the current RFC3339 timestamp to the repo which contains the time which satisfied the spec.

Callouts #

  • Avoid using intervals faster than 1-5 minutes
  • You can use never during development and manually trigger the pipeline
  • If using jsonnet, you can pass arguments like: --arg cronSpec="@every 5m"
  • You cannot update a cron pipeline after it has been created; instead, you must delete the pipeline and build a new one.

When to Use #

You should use a cron input in a Pachyderm Pipeline Spec when you need to process data on a regular schedule, such as hourly or daily. A cron input allows you to specify a schedule for the pipeline to run, and Pachyderm will automatically trigger the pipeline at the specified times.

Example scenarios:

  • Batch processing: If you have a large volume of data that needs to be processed on a regular schedule, a cron input can be used to trigger the processing automatically, without the need for manual intervention.

  • Data aggregation: If you need to collect data from different sources and process it on a regular schedule, a cron input can be used to automate the data collection and processing.

  • Report generation: If you need to generate reports on a regular schedule, a cron input can be used to trigger the report generation process automatically.

Examples #

Examples:
Input Cross PPS

Spec #

This is a top-level attribute of the pipeline spec.


{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "cross": [
    {
      "pfs": {
        "project": string,
        "name": string,
        "repo": string,
        "branch": string,
        "glob": string,
        "lazy" bool,
        "emptyFiles": bool,
        "s3": bool
      }
    },
    {
      "pfs": {
        "project": string,
        "name": string,
        "repo": string,
        "branch": string,
        "glob": string,
        "lazy" bool,
        "emptyFiles": bool,
        "s3": bool
      }
    }
    ...
  ]},
  ...
}

Attributes #

AttributeDescription
nameThe name of the PFS input that appears in the INPUT field when you run the pachctl list pipeline command. If an input name is not specified, it defaults to the name of the repo.
repoSpecifies the name of the Pachyderm repository that contains the input data.
branchThe branch to watch for commits. If left blank, Pachyderm sets this value to master.
globA wildcard pattern that defines how a dataset is broken up into datums for further processing. When you use a glob pattern in a group input, it creates a naming convention that Pachyderm uses to group the files.
lazyControls how the data is exposed to jobs. The default is false which means the job eagerly downloads the data it needs to process and exposes it as normal files on disk. If lazy is set to true, data is exposed as named pipes instead, and no data is downloaded until the job opens the pipe and reads it. If the pipe is never opened, then no data is downloaded.
emptyFilesControls how files are exposed to jobs. If set to true, it causes files from this PFS to be presented as empty files. This is useful in shuffle pipelines where you want to read the names of files and reorganize them by using symlinks.
s3Indicates whether the input data is stored in an S3 object store.

Behavior #

input.cross is an array of inputs to cross. The inputs do not have to be pfs inputs. They can also be union and cross inputs.

A cross input creates tuples of the datums in the inputs. In the example below, each input includes individual datums, such as if foo and bar were in the same repository with the glob pattern set to /*. Alternatively, each of these datums might have come from separate repositories with the glob pattern set to / and being the only file system objects in these repositories.

| inputA | inputB | inputAinputB |
| ------ | ------ | --------------- |
| foo    | fizz   | (foo, fizz)     |
| bar    | buzz   | (foo, buzz)     |
|        |        | (bar, fizz)     |
|        |        | (bar, buzz)     |

The cross inputs above do not take a name and maintain the names of the sub-inputs. In the example above, you would see files under /pfs/inputA/... and /pfs/inputB/....

When to Use #

You should use a cross input in a Pachyderm Pipeline Spec when you need to perform operations on combinations of data from multiple Pachyderm repositories. The cross input allows you to generate a set of combinations of files between two or more repositories, which can be used as the input to your pipeline.

Example scenarios:

  • Data analysis: If you have data from multiple sources that you need to combine and analyze, a cross input can be used to generate a set of combinations of data that can be used as the input to your analysis.

  • Machine learning: If you need to train a machine learning model on combinations of data from multiple sources, a cross input can be used to generate a set of combinations of data that can be used as the input to your model.

  • Report generation: If you need to generate reports that combine data from multiple sources, a cross input can be used to generate a set of combinations of data that can be used as the input to your report generation process.

Input Group PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "group": [
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "groupBy": string,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      },
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "groupBy": string,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      }
    ]
  },
  ...
}

Attributes #

AttributeDescription
nameThe name of the PFS input that appears in the INPUT field when you run the pachctl list pipeline command. If an input name is not specified, it defaults to the name of the repo.
repoSpecifies the name of the Pachyderm repository that contains the input data.
branchThe branch to watch for commits. If left blank, Pachyderm sets this value to master.
globA wildcard pattern that defines how a dataset is broken up into datums for further processing. When you use a glob pattern in a group input, it creates a naming convention that Pachyderm uses to group the files.
groupByA parameter that is used to group input files by a specific pattern.
lazyControls how the data is exposed to jobs. The default is false which means the job eagerly downloads the data it needs to process and exposes it as normal files on disk. If lazy is set to true, data is exposed as named pipes instead, and no data is downloaded until the job opens the pipe and reads it. If the pipe is never opened, then no data is downloaded.
emptyFilesControls how files are exposed to jobs. If set to true, it causes files from this PFS to be presented as empty files. This is useful in shuffle pipelines where you want to read the names of files and reorganize them by using symlinks.
s3Indicates whether the input data is stored in an S3 object store.

Behavior #

The group input in a Pachyderm Pipeline Spec allows you to group input files by a specific pattern.

To use the group input, you specify one or more PFS inputs with a groupBy parameter. This parameter specifies a pattern or field to use for grouping the input files. The resulting groups are then passed to your pipeline as a series of grouped datums, where each datum is a single group of files.

You can specify multiple group input fields in a Pachyderm Pipeline Spec, each with their own groupBy parameter. This allows you to group files by multiple fields or patterns, and pass each group to your pipeline as a separate datum.

The glob and groupBy parameters must be configured.

When to Use #

You should consider using the group input in a Pachyderm Pipeline Spec when you have large datasets with multiple files that you want to partition or group by a specific field or pattern. This can be useful in a variety of scenarios, such as when you need to perform complex data analysis on a large dataset, or when you need to group files by some attribute or characteristic in order to facilitate further processing.

Example scenarios:

  • Partitioning data by time: If you have a large dataset that spans a long period of time, you might want to partition it by day, week, or month in order to perform time-based analysis or modeling. In this case, you could use the group input field to group files by date or time, and then process each group separately.

  • Grouping data by user or account: If you have a dataset that includes data from multiple users or accounts, you might want to group the data by user or account in order to perform user-based analysis or modeling. In this case, you could use the group input field to group files by user or account, and then process each group separately.

  • Partitioning data by geography: If you have a dataset that includes data from multiple geographic regions, you might want to partition it by region in order to perform location-based analysis or modeling. In this case, you could use the group input field to group files by region, and then process each group separately.

Input Join PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "join": [
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "joinOn": string,
          "outerJoin": bool,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      },
      {
        "pfs": {
          "project": string,
          "name": string,
          "repo": string,
          "branch": string,
          "glob": string,
          "joinOn": string,
          "outerJoin": bool,
          "lazy": bool,
          "emptyFiles": bool,
          "s3": bool
        }
      }
    ]
  },
  ...
}

Behavior #

  • A join input must have the glob and joinOn parameters configured to work properly. A join can combine multiple PFS inputs.
  • You can optionally add "outerJoin": true to your PFS input. In that case, you will alter the join’s behavior from a default “inner-join” (creates a datum if there is a match only) to a “outer-join” (the repos marked as "outerJoin": true will see a datum even if there is no match).
  • You can set 0 to many PFS input to "outerJoin": true within your join.

Capture Groups #

When you configure a join input (inner or outer), you must specify a glob pattern that includes a capture group. The capture group defines the specific string in the file path that is used to match files in other joined repos. Capture groups work analogously to the regex capture group. You define the capture group inside parenthesis. Capture groups are numbered from left to right and can also be nested within each other. Numbering for nested capture groups is based on their opening parenthesis.

Below you can find a few examples of applying a glob pattern with a capture group to a file path. For example, if you have the following file path:

/foo/bar-123/ABC.txt

The following glob patterns in a joint input create the following capture groups:

Regular expressionCapture groups
/(*)foo
/*/bar-(*)123
/(*)/*/(??)*.txtCapture group 1: foo, capture group 2: AB.
/*/(bar-(123))/*Capture group 1: bar-123, capture group 2: 123.

Also, joins require you to specify a replacement group in the joinOn parameter to define which capture groups you want to tryto match.

For example, $1 indicates that you want Pachyderm to match based on capture group 1. Similarly, $2 matches the capture group 2. $1$2 means that it must match both capture groups 1 and 2.

See the full join input configuration in the pipeline specification.

You can test your glob pattern and capture groups by using the pachctl list datum -f <your_pipeline_spec.json> command.

💡

The content of the capture group defined in the joinOn parameter is available to your pipeline’s code in an environment variable: PACH_DATUM_<input.name>_JOIN_ON.

Examples #

Inner Join #

Per default, a join input has an inner-join behavior.

For example, you have two repositories. One with sensor readings and the other with parameters. The repositories have the following structures:

  • readings repo:

    ├── ID1234
        ├── file1.txt
        ├── file2.txt
        ├── file3.txt
        ├── file4.txt
        ├── file5.txt
  • parameters repo:

    ├── file1.txt
    ├── file2.txt
    ├── file3.txt
    ├── file4.txt
    ├── file5.txt
    ├── file6.txt
    ├── file7.txt
    ├── file8.txt

Pachyderm runs your code only on the pairs of files that match the glob pattern and capture groups.

The following example shows how you can use joins to group matching IDs:

 {
   "pipeline": {
     "name": "joins"
   },
   "input": {
     "join": [
       {
         "pfs": {
           "repo": "readings",
           "branch": "master",
           "glob": "/*/(*).txt",
           "joinOn": "$1"
         }
       },
      {
        "pfs": {
          "repo": "parameters",
          "branch": "master",
          "glob": "/(*).txt",
          "joinOn": "$1"
        }
      }
    ]
  },
  "transform": {
     "cmd": [ "python3", "/joins.py"],
     "image": "joins-example"
   }
 }

The glob pattern for the readings repository, /*/(*).txt, indicates all matching files in the ID sub-directory. In the parameters repository, the glob pattern /(*).txt selects all the matching files in the root directory. All files with indices from 1 to 5 match. The files with indices from 6 to 8 do not match. Therefore, you only get five datums for this job.

To experiment further, see the full joins example.

Outer Join #

Pachyderm also supports outer joins. Outer joins include everything an inner join does plus the files that didn’t match anything. Inputs can be set to outer semantics independently. So while there isn’t an explicit notion of “left” or “right” outer joins, you can still get those semantics, and even extend them to multiway joins.

Building off the previous example, notice that there are 3 files in the parameters repo, file6.txt, file7.txt and file8.txt, which don’t match any files in the readings repo. In an inner join, those files are omitted. If you still want to see the files without a match, you can use an outer join. The change to the pipeline spec is simple:

 {
   "pipeline": {
     "name": "joins"
   },
   "input": {
     "join": [
       {
         "pfs": {
           "repo": "readings",
           "branch": "master",
           "glob": "/*/(*).txt",
           "joinOn": "$1"
         }
       },
      {
        "pfs": {
          "repo": "parameters",
          "branch": "master",
          "glob": "/(*).txt",
          "joinOn": "$1",
          "outerJoin": true
        }
      }
    ]
  },
  "transform": {
     "cmd": [ "python3", "/joins.py"],
     "image": "joins-example"
   }
 }

Your code will see the joined pairs that it saw before. In addition to those five datums, your code will also see three new ones: one for each of the files in parameters that didn’t match. Note that this means that your code needs to handle (not crash) the case where input files are missing from /pfs/readings.

To experiment further, see the full join example.

Input PFS PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "input": {
    "pfs": {
        "project": string,
        "name": string,
        "repo": string,
        "repoType":string,
        "branch": string,
        "glob": string,
        "joinOn":string,
        "outerJoin": bool,
        "groupBy": string,
        "lazy" bool,
        "emptyFiles": bool,
        "s3": bool,
        "trigger": {
            "branch": string,
            "all": bool,
            "cronSpec": string,
            },
        }
    },
    ...
}

Behavior #

input.pfs.name is the name of the input. An input with the name XXX is visible under the path /pfs/XXX when a job runs. Input names must be unique if the inputs are crossed, but they may be duplicated between PFSInputs that are combined by using the union operator. This is because when PFSInputs are combined, you only ever see a datum from one input at a time. Overlapping the names of combined inputs allows you to write simpler code since you no longer need to consider which input directory a particular datum comes from. If an input’s name is not specified, it defaults to the name of the repo. Therefore, if you have two crossed inputs from the same repo, you must give at least one of them a unique name.

Input Union PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "input": {
    "union": [
    {
      "pfs": {
        "project": string,
        "name": string,
        "repo": string,
        "branch": string,
        "glob": string,
        "lazy" bool,
        "emptyFiles": bool,
        "s3": bool
      }
    },
    {
      "pfs": {
        "project": string,
        "name": string,
        "repo": string,
        "branch": string,
        "glob": string,
        "lazy" bool,
        "emptyFiles": bool,
        "s3": bool
      }
    }
    ...
  ]},
  ...
}

Behavior #

input.union is an array of inputs to combine. The inputs do not have to be pfs inputs. They can also be union and cross inputs.

Union inputs take the union of other inputs. In the example below, each input includes individual datums, such as if foo and bar were in the same repository with the glob pattern set to /*. Alternatively, each of these datums might have come from separate repositories with the glob pattern set to / and being the only file system objects in these repositories.

| inputA | inputB | inputAinputB |
| ------ | ------ | --------------- |
| foo    | fizz   | foo             |
| bar    | buzz   | fizz            |
|        |        | bar             |
|        |        | buzz            |

The union inputs do not take a name and maintain the names of the sub-inputs. In the example above, you would see files under /pfs/inputA/... or /pfs/inputB/..., but never both at the same time. When you write code to address this behavior, make sure that your code first determines which input directory is present. Starting with Pachyderm 1.5.3, we recommend that you give your inputs the same Name. That way your code only needs to handle data being present in that directory. This only works if your code does not need to be aware of which of the underlying inputs the data comes from.

Job Timeout PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "jobTimeout": string,
    ...
}

Behavior #

  • Work that is not complete by set timeout is interrupted.
  • Value must be a string that represents a time value, such as 1s, 5m, or 15h.
  • Differs from datumTimeout in that the limit is applied across all workers and all datums.
  • If not set, a job will run indefinitely until it succeeds or fails.
Metadata PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "metadata": {
        "annotations": {
            "annotation": string
        },
        "labels": {
            "label": string
        }
    },
    ...
}

Behavior #

  • Labels help organize and track cluster objects by creating groups of pods based on a given dimension.

  • Annotations enable you to specify any arbitrary metadata.

Both parameters require a key-value pair. Do not confuse this parameter with podPatch, which adds metadata to the user container of the pipeline pod. For more information, see Labels and Selectors and Kubernetes Annotations in the Kubernetes documentation.

When to Use #

Use metadata for operation ergonomics and to simplify the querying of Kubernetes objects.

Output Branch PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "outputBranch": string,
    ...
}

Behavior #

  • Set to master by default.

When to Use #

Use this setting to output commits to dev or testing branches.

Parallelism Spec PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "parallelismSpec": {
    "constant": int
  },
  ...
}

Behavior #

Pachyderm starts the number of workers that you specify. For example, set "constant":10 to use 10 workers.

  • The default value is 1

When to Use #

⚠️

Because spouts and services are designed to be single instances, do not modify the default parallism_spec value for these pipelines.

Pod Patch PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "podPatch": string,
    ...
}

Behavior #

podPatch is similar to podSpec but is applied as a JSON Patch. Note, this means that the process outlined above of modifying an existing pod spec and then manually blanking unchanged fields won’t work, you’ll need to create a correctly formatted patch by diffing the two pod specs.

Pod Spec PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "podSpec": string,
    ...
}

Behavior #

podSpec is an advanced option that allows you to set fields in the pod spec that haven’t been explicitly exposed in the rest of the pipeline spec. A good way to figure out what JSON you should pass is to create a pod in Kubernetes with the proper settings, then do:

kubectl get po/<pod-name> -o json | jq .spec

this will give you a correctly formatted piece of JSON, you should then remove the extraneous fields that Kubernetes injects or that can be set else where.

The JSON is applied after the other parameters for the podSpec have already been set as a JSON Merge Patch. This means that you can modify things such as the storage and user containers.

Reprocess Spec PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "reprocessSpec": string,
    ...
}

Behavior #

"reprocessSpec": "until_success" is the default behavior. To mitigate datums failing for transient connection reasons, Pachyderm automatically retries user code three (3) times before marking a datum as failed. Additionally, you can set the datumTries field to determine the number of times a job attempts to run on a datum when a failure occurs.

Let’s compare "until_success" and "every_job":

Say we have 2 identical pipelines (reprocess_until_success.json and reprocess_at_every_job.json) but for the "reprocessSpec" field set to "every_job" in reprocess_at_every_job.json.

Both use the same input repo and have a glob pattern set to /*.

  • When adding 3 text files to the input repo (file1.txt, file2.txt, file3.txt), the 2 pipelines (reprocess_until_success and reprocess_at_every_job) will process the 3 datums (here, the glob pattern /* creates one datum per file).
  • Now, let’s add a 4th file file4.txt to our input repo or modify the content of file2.txt for example.
    • Case of our default reprocess_until_success.json pipeline: A quick check at the list datum on the job id shows 4 datums, of which 3 were skipped. (Only the changed file was processed)
    • Case of reprocess_at_every_job.json: A quick check at the list datum on the job id shows that all 4 datums were reprocessed, none were skipped.
⚠️

"reprocessSpec": "every_job will not take advantage of Pachyderm’s default de-duplication. In effect, this can lead to slower pipeline performance. Before using this setting, consider other options such as including metadata in your file, naming your files with a timestamp, UUID, or other unique identifiers in order to take advantage of de-duplication.

When to Use #

Per default, Pachyderm avoids repeated processing of unchanged datums (i.e., it processes only the datums that have changed and skip the unchanged datums). This incremental behavior ensures efficient resource utilization. However, you might need to alter this behavior for specific use cases and force the reprocessing of all of your datums systematically. This is especially useful when your pipeline makes an external call to other resources, such as a deployment or triggering an external pipeline system. Set "reprocessSpec": "every_job" in order to enable this behavior.

Resource Limits PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "resourceLimits": {
  "cpu": number,
  "memory": string,
  "gpu": {
    "type": string,
    "number": int
    }
  "disk": string,
  },
  ...
}

Behavior #

resourceLimits describes the upper threshold of allowed resources a given worker can consume. If a worker exceeds this value, it will be evicted.

The gpu field is a number that describes how many GPUs each worker needs. Only whole number are supported, Kubernetes does not allow multiplexing of GPUs. Unlike the other resource fields, GPUs only have meaning in Limits, by requesting a GPU the worker will have sole access to that GPU while it is running. It’s recommended to enable autoscaling if you are using GPUs so other processes in the cluster will have access to the GPUs while the pipeline has nothing to process. For more information about scheduling GPUs see the Kubernetes docs on the subject.

Resource Requests PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "resourceRequests": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
      }
    "disk": string,
  },
  ...
}

Behavior #

resourceRequests describes the amount of resources that the pipeline workers will consume. Knowing this in advance enables Pachyderm to schedule big jobs on separate machines, so that they do not conflict, slow down, or terminate.

This parameter is optional, and if you do not explicitly add it in the pipeline spec, Pachyderm creates Kubernetes containers with the following default resources:

  • The user and storage containers request 1 CPU, 0 disk space, and 256MB of memory.
  • The init container requests the same amount of CPU, memory, and disk space that is set for the user container.

The resourceRequests parameter enables you to overwrite these default values.

The memory field is a string that describes the amount of memory, in bytes, that each worker needs. Allowed SI suffixes include M, K, G, Mi, Ki, Gi, and other.

For example, a worker that needs to read a 1GB file into memory might set "memory": "1.2G" with a little extra for the code to use in addition to the file. Workers for this pipeline will be placed on machines with at least 1.2GB of free memory, and other large workers will be prevented from using it, if they also set their resourceRequests.

The cpu field is a number that describes the amount of CPU time in cpu seconds/real seconds that each worker needs. Setting "cpu": 0.5 indicates that the worker should get 500ms of CPU time per second. Setting "cpu": 2 indicates that the worker gets 2000ms of CPU time per second. In other words, it is using 2 CPUs, though worker threads might spend 500ms on four physical CPUs instead of one second on two physical CPUs.

The disk field is a string that describes the amount of ephemeral disk space, in bytes, that each worker needs. Allowed SI suffixes include M, K, G, Mi, Ki, Gi, and other.

In both cases, the resource requests are not upper bounds. If the worker uses more memory than it is requested, it does not mean that it will be shut down. However, if the whole node runs out of memory, Kubernetes starts deleting pods that have been placed on it and exceeded their memory request, to reclaim memory. To prevent deletion of your worker node, you must set your memory request to a sufficiently large value. However, if the total memory requested by all workers in the system is too large, Kubernetes cannot schedule new workers because no machine has enough unclaimed memory. cpu works similarly, but for CPU time.

For more information about resource requests and limits see the Kubernetes docs on the subject.

s3 Out PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "s3Out": bool,
    ...
}

Behavior #

s3Out allows your pipeline code to write results out to an S3 gateway endpoint instead of the typical pfs/out directory. When this parameter is set to true, Pachyderm includes a sidecar S3 gateway instance container in the same pod as the pipeline container. The address of the output repository will be s3://<output_repo>.

If you want to expose an input repository through an S3 gateway, see input.pfs.s3 in PFS Input.

When to Use #

You should use the s3 Out attribute when you’d like to access and store the results of your Pachyderm transformations externally.

Scheduling Spec PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "schedulingSpec": {
        "nodeSelector": {string: string},
        "priorityClassName": string
    },
    ...
}

Attributes #

AttributeDescription
nodeSelectorAllows you to select which nodes your pipeline will run on. Refer to the Kubernetes docs on node selectors for more information about how this works.
priorityClassNameAllows you to select the priority class for the pipeline, which is how Kubernetes chooses to schedule and de-schedule the pipeline. Refer to the Kubernetes docs on priority and preemption for more information about how this works.

Behavior #

  • When you include a nodeSelector in the schedulingSpec, it tells Kubernetes to schedule the pipeline’s Pods on nodes that match the specified key-value pairs. For example, if you specify {"gpu": "true"} in the nodeSelector, Kubernetes will only schedule the pipeline’s Pods on nodes that have a label gpu=true. This is useful when you have specific hardware or other node-specific requirements for your pipeline.

  • When you specify a priorityClassName in the schedulingSpec, it tells Kubernetes to assign the specified priority class to the pipeline’s Pods. The priority class determines the priority of the Pods relative to other Pods in the cluster, and can affect the order in which Pods are scheduled and the resources they are allocated. For example, if you have a high-priority pipeline that needs to complete as quickly as possible, you can assign it a higher priority class than other Pods in the cluster to ensure that it gets scheduled and allocated resources first.

When to Use #

You should use the schedulingSpec field in a Pachyderm Pipeline Spec when you have specific requirements for where and when your pipeline runs. This can include requirements related to hardware, node labels, scheduling priority, and other factors.

Example requirements:

  • Hardware requirements: If your pipeline requires specific hardware, such as GPUs, you can use the nodeSelector field to ensure that your pipeline runs on nodes that have the necessary hardware.

  • Node labels: If you have specific requirements for node labels, such as data locality, you can use the nodeSelector field to schedule your pipeline on nodes with the appropriate labels.

  • Priority: If you have a high-priority pipeline that needs to complete as quickly as possible, you can use the priorityClassName field to assign a higher priority class to your pipeline’s Pods.

  • Resource constraints: If your pipeline requires a large amount of resources, such as CPU or memory, you can use the nodeSelector field to ensure that your pipeline runs on nodes with sufficient resources.

Service PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "service": {
    "internalPort": int,
    "externalPort": int
  },
  ...
}

Attributes #

AttributeDescription
internalPortThe port that the user code binds to inside the container.
externalPortThe port on which it is exposed through the NodePorts functionality of Kubernetes services.

Behavior #

  • When enabled, transform.cmd is not expected to exit and will restart if it does.
  • The service becomes exposed outside the container using a Kubernetes service.
  • You can access the service at http://<kubernetes-host>:<externalPort>.
  • The Service starts running at the first commit in the input repo.

When to Use #

You should use the service field in a Pachyderm Pipeline Spec when you want to expose your pipeline as a Kubernetes service, and allow other Kubernetes services or external clients to connect to it.

Example scenarios:

  • Microservices architecture: If you are building a microservices architecture, you may want to expose individual pipelines as services that can be accessed by other services in the cluster. By using the service field to expose your pipeline as a Kubernetes service, you can easily connect it to other services in the cluster.

  • Client access: If you want to allow external clients to access the output of your pipeline, you can use the service field to expose your pipeline as a Kubernetes service and provide clients with the service’s IP address and externalPort.

  • Load balancing: By exposing your pipeline as a Kubernetes service, you can take advantage of Kubernetes’ built-in load balancing capabilities. Kubernetes automatically load balances traffic to the service’s IP address and externalPort across all the replicas of the pipeline’s container.

Sidecar Resource Limits PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "sidecarResourceLimits": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
      }
    "disk": string,
  },
  ...
}

Attributes #

AttributeDescription
cpuThe maximum number of CPU cores that the sidecar container can use.
memoryThe maximum amount of memory that the sidecar container can use. This can be specified in bytes, or with a unit such as “Mi” or “Gi”.
gpuAn optional field that specifies the number and type of GPUs that the sidecar container can use.
typeThe type of GPU to use, such as “nvidia” or “amd”.
numberThe number of GPUs that the sidecar container can use.
diskThe maximum amount of disk space that the sidecar container can use. This can be specified in bytes, or with a unit such as “Mi” or “Gi”.

Behavior #

The sidecarResourceLimits field in a Pachyderm Pipeline Spec is used to specify the resource limits for any sidecar containers that are run alongside the main pipeline container.

In a Pachyderm Pipeline, sidecar containers can be used to perform additional tasks alongside the main pipeline container, such as logging, monitoring, or handling external dependencies. By specifying resource limits for these sidecar containers, you can ensure that they don’t consume too many resources and impact the performance of the main pipeline container.

This field can also be useful in deployments where Kubernetes automatically applies resource limits to containers, which might conflict with Pachyderm pipelines’ resource requests. Such a deployment might fail if Pachyderm requests more than the default Kubernetes limit. The sidecarResourceLimits enables you to explicitly specify these resources to fix the issue.

When to Use #

You should use the sidecarResourceLimits field in a Pachyderm Pipeline Spec when you have sidecar containers that perform additional tasks alongside the main pipeline container, and you want to set resource limits for those sidecar containers.

Example scenarios:

  • Logging: If you have a sidecar container that is responsible for logging, you may want to limit its CPU and memory usage to prevent it from consuming too many resources and impacting the performance of the main pipeline container.

  • Monitoring: If you have a sidecar container that is responsible for monitoring the pipeline, you may want to limit its CPU and memory usage to prevent it from competing with the main pipeline container for resources.

  • External dependencies: If you have a sidecar container that provides external dependencies, such as a database, you may want to limit its CPU and memory usage to ensure that the main pipeline container has sufficient resources to perform its task.

Sidecar Resource Requests PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "sidecarResourceRequests": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
      }
    "disk": string,
  },
  ...
}

Attributes #

AttributeDescription
cpuThe minimum number of CPU cores that the storage container will reserve.
memoryThe minimum amount of memory that the storage container will reserve. This can be specified in bytes, or with a unit such as “Mi” or “Gi”.
gpuAn optional field that specifies the number and type of GPUs that the storage container will reserve.
typeThe type of GPU to use, such as “nvidia” or “amd”.
numberThe number of GPUs that the storage container will reserve.
diskThe minimum amount of disk space that the storage container will reserve. This can be specified in bytes, or with a unit such as “Mi” or “Gi”.

Behavior #

The sidecarResourceRequests field in a Pachyderm Pipeline Spec is used to specify the resource requests for the storage container that runs alongside the user container.

In a Pachyderm Pipeline, the storage container is used to perform additional tasks alongside the user pipeline container, such as logging, monitoring, or handling external dependencies. By specifying resource requests for this sidecar container, you can ensure that the storage container has enough resources reserved as to not impact the performance of the user container.

Spec Commit PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "specCommit": {
    "option": false,
    "branch": {
      "option": false,
      "repo": {
        "option": false,
        "name": string,
        "type": string,
        "project":{
          "option": false,
          "name": string,
        },
      },
      "name": string
    },
    "id": string,
  },
  ...
}

When to Use #

You do not need to ever configure this attribute; its details are auto-generated.

Spout PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
  "pipeline": {...},
  "transform": {...},
  "spout": {
  \\ Optionally, you can combine a spout with a service:
  "service": {
    "internalPort": int,
    "externalPort": int
    }
  },
  ...
}

Attributes #

AttributeDescription
serviceAn optional field that is used to specify how to expose the spout as a Kubernetes service.
internalPortUsed for the spout’s container.
externalPortUsed for the Kubernetes service that exposes the spout.

Behavior #

  • Does not have a PFS input; instead, it consumes data from an outside source.
  • Can have a service added to it. See Service.
  • Its code runs continuously, waiting for new events.
  • The output repo, pfs/out is not directly accessible. To write into the output repo, you must to use the put file API call via any of the following:
    • pachctl put file
    • A Pachyderm SDK (for golang or Python )
    • Your own API client.
  • Pachyderm CLI (PachCTL) is packaged in the base image of your spout as well as your authentication information. As a result, the authentication is seamless when using PachCTL.

Diagram #

spout-tldr

When to Use #

You should use the spout field in a Pachyderm Pipeline Spec when you want to read data from an external source that is not stored in a Pachyderm repository. This can be useful in situations where you need to read data from a service that is not integrated with Pachyderm, such as an external API or a message queue.

Example scenarios:

  • Data ingestion: If you have an external data source, such as a web service, that you want to read data from and process with Pachyderm, you can use the spout field to read the data into Pachyderm.

  • Real-time data processing: If you need to process data in real-time and want to continuously read data from an external source, you can use the spout field to read the data into Pachyderm and process it as it arrives.

  • Data integration: If you have data stored in an external system, such as a message queue or a streaming service, and you want to integrate it with data stored in Pachyderm, you can use the spout field to read the data from the external system and process it in Pachyderm.

Example #

{
  "pipeline": {
    "name": "my-spout"
  },
    "spout": {
  },
  "transform": {
    "cmd": [ "go", "run", "./main.go" ],
    "image": "myaccount/myimage:0.1",
    "env": {
        "HOST": "kafkahost",
        "TOPIC": "mytopic",
        "PORT": "9092"
    }
  }
}
💡

For a first overview of how spouts work, see our spout101 example.

Tolerations PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {...},
    "tolerations": [
    {
      "key": "dedicated",
      "operator": "EXISTS",
      "effect": "NO_SCHEDULE"
    }
  ],
    ...
}

Behavior #

Pipeline tolerations enable you to run a pipeline on a node that has a taint.

  • You can have as many tolerations as you’d like, or none at all.
  • Taints behave almost exactly like the Kuberentes API, with the exception of some enums such as Exists and DoesNotExist being replaced with Golang equivalents like EXISTS and DOES_NOT_EXIST.

Example of Tainting a Node #

kubectl taint node example dedicated:NoSchedule
Transform PPS

Spec #

This is a top-level attribute of the pipeline spec.

{
    "pipeline": {...},
    "transform": {
        "image": string,
        "cmd": [ string ],
        "datumBatching": bool,
        "errCmd": [ string ],
        "env": {
            string: string
        },

        "secrets": [ {
            "name": string,
            "mountPath": string
        },
        {
            "name": string,
            "envVar": string,
            "key": string
        } ],
        "imagePullSecrets": [ string ],
        "stdin": [ string ],
        "errStdin": [ string ],
        "acceptReturnCode": [ int ],
        "debug": bool,
        "user": string,
        "workingDir": string,
        "dockerfile": string,
        "memoryVolume": bool,
    },
    ...
}

Attributes #

AttributeDescription
cmdPasses a command to the Docker run invocation.
datumBatchingEnables you to call your user code once for a batch of datums versus calling it per each datum.
stdinPasses an array of lines to your command on stdin.
errCmdPasses a command executed on failed datums.
errStdinPasses an array of lines to your error command on stdin.
envEnables a key-value map of environment variables that Pachyderm injects into the container.
secretsPasses an array of secrets to embed sensitive data.
imagePullSecretsPasses an array of secrets that are mounted before the containers are created.
acceptReturnCodePasses an array of return codes that are considered acceptable when your Docker command exits.
debugEnables debug logging for the pipeline
userSets the user that your code runs as.
workingDirSets the directory that your command runs from.
memoryVolumeSets pachyderm-worker’s emptyDir.Medium to Memory, allowing Kubernetes to mount a memory-backed volume (tmpfs).

Behavior #

  • cmd is not run inside a shell which means that wildcard globbing (*), pipes (|), and file redirects (> and >>) do not work. To specify these settings, you can set cmd to be a shell of your choice, such as sh and pass a shell script to stdin.
  • errCmd can be used to ignore failed datums while still writing successful datums to the output repo, instead of failing the whole job when some datums fail. The transform.errCmd command has the same limitations as transform.cmd.
  • stdin lines do not have to end in newline characters.
  • The following environment variables are automatically injected into the container:
    • PACH_JOB_ID – the ID of the current job.
    • PACH_OUTPUT_COMMIT_ID – the ID of the commit in the output repo for the current job.
    • <input>_COMMIT - the ID of the input commit. For example, if your input is the images repo, this will be images_COMMIT.
  • secrets reference Kubernetes secrets by name and specify a path to map the secrets or an environment variable (envVar) that the value should be bound to.
  • 0 is always considered a successful exit code.
  • tmpfs is cleared on node reboot and any files you write count against your container’s memory limit. This may be useful for workloads that are IO heavy or use memory caches.
💡

**Using a private registry? **

You can use imagePullSecrets to mount a secret that contains your registry credentials.

{
  "pipeline": {
    "name": "pipeline-a"
  },
  "description": "...",
  "transform": {
    "cmd": [ "python3", "/example.py" ],
    "image": "<private container registry>/image:1.0",
    "imagePullSecrets": [ "k8s-secret-with-creds" ]
  },
  ...
}

When to Use #

You must always use the transform attribute when making a pipeline.

Full Pipeline Specification
{
  "pipeline": {
    "name": string,
    "project": {
      "name": "projectName"
    },
  },
  "description": string,
  "metadata": {
    "annotations": {
        "annotation": string
    },
    "labels": {
        "label": string
    }
  },
  "tfJob": {
    "tfJob": string,
  },
  "transform": {
    "image": string,
    "cmd": [ string ],
    "errCmd": [ string ],
    "env": {
        string: string
    },
    "secrets": [ {
        "name": string,
        "mountPath": string
    },
    {
        "name": string,
        "envVar": string,
        "key": string
    } ],
    "imagePullSecrets": [ string ],
    "stdin": [ string ],
    "errStdin": [ string ],
    "acceptReturnCode": [ int ],
    "debug": bool,
    "user": string,
    "workingDir": string,
    "dockerfile": string,
    "memoryVolume": bool,
  },
  "parallelismSpec": {
    "constant": int
  },
  "egress": {
    // Egress to an object store
    "URL": "s3://bucket/dir"
    // Egress to a database
    "sqlDatabase": {
        "url": string,
        "fileFormat": {
            "type": string,
            "columns": [string]
        },
        "secret": {
            "name": string,
            "key": "PACHYDERM_SQL_PASSWORD"
        }
    }
  },
  "update": bool,
  "outputBranch": string,
  [
    {
      "workerId": string,
      "jobId": string,
      "datumStatus" : {
        "started": timestamp,
        "data": []
      }
    }
  ],
  "s3Out": bool,
  "resourceRequests": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
    }
    "disk": string,
  },
  "resourceLimits": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
    }
    "disk": string,
  },
  "sidecarResourceLimits": {
    "cpu": number,
    "memory": string,
    "gpu": {
      "type": string,
      "number": int
    }
    "disk": string,
  },
  "input": {
    <"pfs", "cross", "union", "join", "group" or "cron" see below>
  },
  "description": string,
  "reprocess": bool,
  "service": {
    "internalPort": int,
    "externalPort": int
  },
  "spout": {
    \\ Optionally, you can combine a spout with a service:
    "service": {
      "internalPort": int,
      "externalPort": int
    }
  },
  "datumSetSpec": {
    "number": int,
    "sizeBytes": int,
    "perWorker": int,
  }
  "datumTimeout": string,
  "jobTimeout": string,
  "salt": string,
  "datumTries": int,
  "schedulingSpec": {
    "nodeSelector": {string: string},
    "priorityClassName": string
  },
  "podSpec": string,
  "podPatch": string,
  "specCommit": {
    "option": false,
    "branch": {
      "option": false,
      "repo": {
        "option": false,
        "name": string,
        "type": string,
        "project":{
          "option": false,
          "name": string,
        },
      },
      "name": string
    },
    "id": string,
  }
  "metadata": {

  },
  "reprocessSpec": string,
  "autoscaling": bool
}

------------------------------------
"pfs" input
------------------------------------

"pfs": {
  "name": string,
  "repo": string,
  "repoType":string,
  "branch": string,
  "commit":string,
  "glob": string,
  "joinOn":string,
  "outerJoin": bool,
  "groupBy": string,
  "lazy" bool,
  "emptyFiles": bool,
  "s3": bool,
  "trigger": {
    "branch": string,
    "all": bool,
    "cronSpec": string,
  },
}

------------------------------------
"cross" or "union" input
------------------------------------

"cross" or "union": [
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "lazy" bool,
      "emptyFiles": bool,
      "s3": bool
    }
  },
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "lazy" bool,
      "emptyFiles": bool,
      "s3": bool
    }
  }
  ...
]


------------------------------------
"join" input
------------------------------------

"join": [
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "joinOn": string,
      "outerJoin": bool,
      "lazy": bool,
      "emptyFiles": bool,
      "s3": bool
    }
  },
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "joinOn": string,
      "outerJoin": bool,
      "lazy": bool,
      "emptyFiles": bool,
      "s3": bool
    }
  }
]


------------------------------------
"group" input
------------------------------------

"group": [
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "groupBy": string,
      "lazy": bool,
      "emptyFiles": bool,
      "s3": bool
    }
  },
  {
    "pfs": {
      "name": string,
      "repo": string,
      "branch": string,
      "glob": string,
      "groupBy": string,
      "lazy": bool,
      "emptyFiles": bool,
      "s3": bool
    }
  }
]



------------------------------------
"cron" input
------------------------------------

"cron": {
    "name": string,
    "spec": string,
    "repo": string,
    "start": time,
    "overwrite": bool
}```

Pps Series

Autoscaling PPS
Datum Set Spec PPS
Datum Timeout PPS
Datum Tries PPS
Description PPS
Egress PPS
Input Cron PPS
Input Cross PPS
Input Group PPS
Input Join PPS
Input PFS PPS
Input Union PPS
Job Timeout PPS
Metadata PPS
Output Branch PPS
Parallelism Spec PPS
Pod Patch PPS
Pod Spec PPS
Reprocess Spec PPS
Resource Limits PPS
Resource Requests PPS
s3 Out PPS
Scheduling Spec PPS
Service PPS
Sidecar Resource Limits PPS
Sidecar Resource Requests PPS
Spec Commit PPS
Spout PPS
Tolerations PPS
Transform PPS
Full Pipeline Specification