Introduction
Understand Flyte and gRPC ML workflows on Google Axion
Create a Google Axion C4A Arm virtual machine
Install Flyte and gRPC tools on Axion
Build a gRPC feature engineering service
Create ML Training Workflow
Execute and validate the ML pipeline
Understand the distributed ML architecture
Next Steps
In modern machine learning pipelines, feature engineering is often implemented as a separate service so it can scale independently from the training workflow.
In this section, you create a gRPC-based feature engineering service that generates features used by the machine learning pipeline.
The Flyte workflow will call this service during pipeline execution.
The feature engineering service acts as an external microservice used by the ML workflow.
Flyte Workflow
|
v
Feature Engineering Service (gRPC)
|
v
Generated Features for Model Training
Create a directory for the ML workflow project.
mkdir flyte-ml-pipeline
cd flyte-ml-pipeline
Create the gRPC service definition file.
vi feature.proto
Add the following code.
syntax = "proto3";
service FeatureService {
rpc GenerateFeatures (FeatureRequest) returns (FeatureResponse);
}
message FeatureRequest {
int32 value = 1;
}
message FeatureResponse {
int32 feature = 1;
}
This file defines the service interface used by the workflow and the feature service.
Make sure the flyte-env virtual environment is active before running the following commands. If you opened a new terminal, reactivate it:
source ~/flyte-env/bin/activate
Compile the protobuf file to generate Python client and server code.
python -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
feature.proto
The command generates the following files:
feature_pb2.py
feature_pb2_grpc.py
These files contain the Python classes used by the gRPC server and client.
Why this matters:
Create the server implementation.
vi feature_server.py
Add the following code.
import grpc
from concurrent import futures
import feature_pb2
import feature_pb2_grpc
class FeatureService(feature_pb2_grpc.FeatureServiceServicer):
def GenerateFeatures(self, request, context):
value = request.value
feature = value * 10
print("Generating feature for:", value)
return feature_pb2.FeatureResponse(feature=feature)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
feature_pb2_grpc.add_FeatureServiceServicer_to_server(
FeatureService(), server
)
server.add_insecure_port("[::]:50051")
server.start()
print("Feature gRPC service running on port 50051")
server.wait_for_termination()
if __name__ == "__main__":
serve()
The service receives a value from the workflow and generates a derived feature used during model training.
Start the gRPC service.
python feature_server.py
The output is similar to:
Feature gRPC service running on port 50051
In the next section, you will create a Flyte ML training workflow that calls this feature engineering service during pipeline execution.