Saturday, March 2, 2024
No menu items!
HomeData IntegrationPython Pipeline: Here’s How to Build Your Python Package and install it...

Python Pipeline: Here’s How to Build Your Python Package and install it in StreamSets

Coding in Python gives developers ultimate control over every aspect of their design, but with a plethora of choices comes the dangers of becoming distracted. Low code, graphical environments provide for easy operation and reuse of components but with shallow levels of control than hand coding. Custom processors allow data engineers to operationalize their code and provide powerful extensibility for coders.

This Python pipeline walkthrough is created for the purpose of demonstrating how a user can run their own Python code into StreamSets Data Collector

This demo is for installing a Data Collector using Docker. Your complex Python code will be packaged and deployed into PyPI. Simply add the Python package and build your own Docker image on top of StreamSets Docker image. Once the Data Collector is installed using your own custom image, you’ll simply import the Python packages in the jython evaluator.

Let’s get started!


Have an account in PyPI.
Install twine to connect to PyPI.
Install docker.
Have a github account to upload your code.
Have an account in Streamsets DataOps platform. 

High level steps:

Create a package for your python package.
Build the project.
Upload to PyPI.
Create a Dockerfile and install the python package.
Run the script generated from StreamSets deployment with your custom image.
Create a pipeline with Jython evaluator.

Below are the steps to create your own python package and upload to PyPI.

PyPI is the official Python repository where all Python Packages are stored. You can think of it as the Github for Python Packages.

To make your Python Package available to people around the world, you’ll need to have an account with PyPI.

I am using twine to upload the files to PyPI. you can install twine using the below command:

 pip install twine

Benefits and Limitations of a Python Pipeline in StreamSets


Customers can use their existing python packages into StreamSets. 
Users do not have to rewrite the complex logic into StreamSets.


Customers have to rebuild the custom images during upgrades.

Python Calculator Project

Folder Structure

Content of

print(“Performing Addition:”)

def add(x,y):

return x+y

You can grab the complete code here on GitHub.

Create a inside the WilsonCalculator and add below content

__author__ = ‘wilson shamim’

__version__ = ‘1.1.0’

from WilsonCalculator.Addition import add

from WilsonCalculator.Subtraction import sub

from WilsonCalculator.Division import div

from WilsonCalculator.Multiplication import Mul

Create a under WilsonProject. can be found here.

Navigate to WilsonProject and run the below command to build the project

python sdist

This will create below folders


wilsonshamim@Wilsons-MBP PythonTutorials % cd WilsonProject 

wilsonshamim@Wilsons-MBP WilsonProject % python sdist 

running sdist

running egg_info

creating WilsonCalculators.egg-info

writing WilsonCalculators.egg-info/PKG-INFO

writing top-level names to WilsonCalculators.egg-info/top_level.txt

writing dependency_links to WilsonCalculators.egg-info/dependency_links.txt

writing manifest file ‘WilsonCalculators.egg-info/SOURCES.txt’

reading manifest file ‘WilsonCalculators.egg-info/SOURCES.txt’

writing manifest file ‘WilsonCalculators.egg-info/SOURCES.txt’

warning: sdist: standard file not found: should have one of README, README.rst, README.txt,

running check

warning: check: missing required meta-data: url

creating WilsonCalculators-1.1.0

creating WilsonCalculators-1.1.0/WilsonCalculator

creating WilsonCalculators-1.1.0/WilsonCalculators.egg-info

copying files to WilsonCalculators-1.1.0…

copying -> WilsonCalculators-1.1.0

copying WilsonCalculator/ -> WilsonCalculators-1.1.0/WilsonCalculator

copying WilsonCalculator/ -> WilsonCalculators-1.1.0/WilsonCalculator

copying WilsonCalculator/ -> WilsonCalculators-1.1.0/WilsonCalculator

copying WilsonCalculator/ -> WilsonCalculators-1.1.0/WilsonCalculator

copying WilsonCalculator/ -> WilsonCalculators-1.1.0/WilsonCalculator

copying WilsonCalculators.egg-info/PKG-INFO -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info

copying WilsonCalculators.egg-info/SOURCES.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info

copying WilsonCalculators.egg-info/dependency_links.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info

copying WilsonCalculators.egg-info/top_level.txt -> WilsonCalculators-1.1.0/WilsonCalculators.egg-info

Writing WilsonCalculators-1.1.0/setup.cfg

creating dist

Creating tar archive

removing ‘WilsonCalculators-1.1.0’ (and everything under it)

Once the setup is complete, 2 new folders dist and will be created.


Navigate to WilsonProject and run below command to upload the project in the PyPI


twine upload dist/*


It will ask for username and password. 

Once uploaded successfully, you can now install it using pip 

pip install WilsonCalculators

Once installed successfully, you can now use it in your code


Next, create a dockerfile with below content


FROM streamsets/datacollector:4.3.0

USER root

RUN apk add –update –no-cache bash












    sudo &&

    echo ‘hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4’ >> /etc/nsswitch.conf

RUN python2.7 -m ensurepip –default-pip

RUN python2 -m pip install WilsonCalculators

A docker image named wilsoncalculator will be created


ubuntu@ip-10-10-52-110:~/customImage$ docker images

REPOSITORY                                       TAG                                                   IMAGE ID            CREATED              SIZE

wilsoncalculator                                 latest                                                5a33c8117856        About a minute ago   952MB

Now we can use this image to run the engine and connect to DataOps

You can refer to the documentation for creating deployments and how to get the install engine scripts. Once you have obtained the install scripts from the deployments, replace streamsets/datacollector:<version> with wilsoncalculator


docker run -d -e STREAMSETS_DEPLOYMENT_SCH_URL= -e STREAMSETS_DEPLOYMENT_ID=<deployment ID>  -e STREAMSETS_DEPLOYMENT_TOKEN=<Token> wilsoncalculator:latest


ubuntu@ip-10-10-52-110:~/customImage$ docker ps

CONTAINER ID        IMAGE                                             COMMAND                  CREATED             STATUS              PORTS                                                NAMES

3d59d6d37d45        wilsoncalculator:latest                           “…”   15 seconds ago      Up 13 seconds       18630/tcp  

Now you can create your Python pipeline.

I have created a WilsonCalculator pipeline with the jython evaluator.

Jython scripts:




  import sys


  from WilsonCalculator.Addition import add



# Sample Jython code

for record in sdc.records:

  try:“Start calculation…..”)[‘input1’]))[‘input2’]))[‘input1’]),int(record.value[‘input2’]))))“———————-“)


  except Exception as e:

    # Send record to error

    sdc.error.write(record, str(e))


Generated 3 records

Logged the additional result.

You can see from this example the benefits and limitations of embedded python in your smart data pipelines. StreamSets aims to bridge the gap between the ultimate control of hand coding and ease and repeatability of a graphical interface.

With StreamSets you can:

Quickly build, deploy, and scale streaming, batch, CDC, ETL and ML pipelines
Handle data drift automatically, keeping jobs running even when schemas and structures change
Deploy, monitor, and manage all your data pipelines – across hybrid and multi-cloud – from a single dashboard 

Try smart data pipelines out yourself with StreamSets, a fully cloud-based, all-in-one DataOps platform. Sign up now and start building pipelines for free


The post Python Pipeline: Here’s How to Build Your Python Package and install it in StreamSets appeared first on StreamSets.

Read MoreStreamSets



Please enter your comment!
Please enter your name here

Most Popular

Recent Comments