Monday, July 15, 2024
No menu items!
HomeCloud ComputingRunning ML models now easier with new Dataflow ML innovations on Apache...

Running ML models now easier with new Dataflow ML innovations on Apache Beam

According to Harvard Business Review, only 20% of companies see their models go into production for AI. Google Cloud Dataflow builds on one of the most popular open source frameworks, Apache Beam, which is a unified programming model and SDK for developing batch and streaming pipelines. Continuing our commitment to building an open product, working closely with the Beam community, we’re excited to add three new machine learning (ML) focused features to Dataflow that tightly integrates with the vast resources from the community and help simplify running streaming ML models at scale in production: 

Automatic Model Refresh: Because your data and ML model are ever changing, models need to be continuously retrained and improved to remain effective. These model updates shouldn’t require a large effort by the data engineering teams to redeploy. The new streaming Automatic Model Refresh feature allows you to update models, hot swapping them in a running streaming pipeline with no pause in processing the stream of data, avoiding downtime. 

TensorFlow Hub integration: TensorFlow Hub is a repository of pre-trained machine learning models, and it’s a valuable resource for researchers and developers who want to quickly and easily deploy machine learning models in their applications. Open source models, individually or as an ensemble, are a common tool in any data scientist’s repertoire. For example, if you’re doing sentiment analysis on reviews posted by customers, you might use an open source model for the embeddings stage before passing the data to a custom, domain-specific model. To make this step easy for Apache Beam users, the TensorFlow Hub integration allows you to use just a few lines of code to download and consume a model in your batch or streaming Dataflow pipeline.

Model Ensembles: With the proliferation of ML frameworks, you don’t want to be restricted to using just one processing pipeline. The Apache Beam SDK allows you to use multiple frameworks such as Tensorflow, PyTorch, SKLearn… in a single pipeline, as an ensemble. The Apache Beam community recently started supporting the ONNX model handler, adding to the already expansive list of supported frameworks. 

Streaming Automatic Model Refresh

Apache Beam has always been known for its ability to work with both streaming and batch using a single, unified API. This feature allows developers to move between batch and streaming without needing to change code or add a bunch of new import statements. Until now, however, every time a model was retrained, the operational engineers had to work through various pipeline lifecycle events. With the Automatic Model Refresh feature, the RunInference transform automatically updates the model handler without requiring an update to or redeployment of the whole pipeline.

Two modes are available with this feature: 

Watch mode, which pulls updates from Google Cloud Storage: The RunInference WatchFilePattern class is used to watch for the latest file. It then updates by matching a file_pattern based on timestamps. It emits the latest ModelMetadata, which is used in the RunInference PTransform to update the ML model without stopping the Beam pipeline. 

Event mode: By connecting the pipeline to an unbounded source, such as Pub/Sub, update events are sent directly to the transform, initiating a hot swap of the model on demand. 

Tensorflow Hub integration

With the new native TensorFlow model handler, you can use TensorFlow Hub models in Dataflow pipelines by passing a URL to the model as an argument. TensorFlow Hub is a repository of pre-trained machine learning models available for use within the TensorFlow framework and its high-level wrapper Keras. You can leverage powerful models for a variety of tasks without having to train them. The repository contains over a thousand models, adding more through community contribution. 

Let’s look at an example of image classification, common in a wide variety of industries, like retail store management and e-commerce sites. In the following example, we use the mobilenet_v2 model on tfhub. With just a few lines of code, our pipeline uses a model that can deal with millions of images. ( A runnable copy of this notebook is available here).

code_block[StructValue([(u’code’, u’from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensorrnfrom apache_beam.ml.inference.base import PredictionResultrnfrom apache_beam.ml.inference.base import RunInferencernfrom typing import Iterablernrnmodel_handler = TFModelHandlerTensor(model_uri=”https://tfhub.dev/google/tf2-preview/mobilenet_v2/classification/4″)rnrnclass PostProcessor(beam.DoFn):rn “””Process the PredictionResult to get the predicted label.rn Returns predicted label.rn “””rn def setup(self):rn labels_path = tf.keras.utils.get_file(rn ‘ImageNetLabels.txt’, ‘https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt’rn )rn self._imagenet_labels = np.array(open(labels_path).read().splitlines())rnrn def process(self, element: PredictionResult) -> Iterable[str]:rn predicted_class = np.argmax(element.inference)rn predicted_class_name = self._imagenet_labels[predicted_class]rn yield “Predicted Label: {}”.format(predicted_class_name.title())rnrnwith beam.Pipeline() as p:rn _ = (prn | “Create PCollection” >> beam.Create([img_tensor])rn | “Perform inference” >> RunInference(model_handler)rn | “Post Processing” >> beam.ParDo(PostProcessor())rn | “Print” >> beam.Map(print))’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e4eacb3e690>)])]

To make this code work for millions of images, change the source to a bucket on an object store and use one of the many Apache Beam sinks to output the results to your desired destination. 

Model ensembles

Increasingly, even single business use cases are built using more than one framework. Therefore, RunInference also needs to support as many frameworks as possible. It adds this support through its model handlers. 

Before this release Apache Beam supported:

Tensorflow

Pytorch

scikit-learn

TensorRT Engine

The Beam community has added the ONNX runtime with an ONNX model handler. You can see an example pipeline at onnx_sentiment_classification.py.

You can read more about the use of multiple models in one pipeline here, which also includes a notebook that shows the BLIP and CLIP models used together.

Conclusion 

Data engineers, data scientists and developers can now take advantage of the latest features from Dataflow ML, while the Apache Beam community continues to add features to the RunInference transform to make ML productionisation and development easier and more flexible:

It’s now easier to create ensembles of models from different frameworks within the same pipeline.

We’ve reduced the amount of code that needs to be written when using TensorFlow and TensorFlow Hub models.

Continuous deployment to streaming pipelines is easier than ever with automatic model refreshes.

You can find details of these features and more example notebooks on the Dataflow ML documentation.

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments