A trained machine learning model alone will not add value for business. The model must be integrated into the company’s IT infrastructure. Let’s develope REST API microservice to classify Iris flowers. The dataset consists of the length and width of two types of Iris petals: sepal and petal. The target variable is Iris variety: 0 – Setosa, 1 – Versicolor, 2 – Virginica.
Saving and loading a model
Before moving on to develope API, we need to train and save the model. Take the RandomForestClassifier model. Now let’s save the model to a file and load it to make predictions. This can be done with pickle or joblib.
import pickle filename = 'model.pkl'
pickle.dump(clf, open(filename, 'wb'))
We’ll use pickle.load to load and validate the model.
loaded_model = pickle.load(open(filename, 'rb'))
result = loaded_model.score(X_test, y_test)
print(result)
The code for training, saving and loading the model is available in the repository — link
What is REST API and microservices
Microservice is a web service that decides one business need and can interact with other services in the IT infrastructure using, for example, HTTP. An architecture consisting of several microservices is called a microservice architecture.
REST (Representational State Transfer) – the principles of organizing the interaction of services through the HTTP protocol. Clients send requests using the methods provided by the HTTP protocol and perform the operation. For example: getting, adding, modifying or deleting data.
API (Application Programming Interface) — interface for communication between services.
Microservice design
Let’s move on to practice to make it clearer. We will design our service. Service structure:
- rest_api.py — Flask application that interacts with the client and returns a model prediction
- model.py — file with functions for loading models
- models/ — folder with saved models
- logs/ —folder with logs
The API will be accessed at the following URL— http://[hostname]/iris/api/v1.0/getpred
The URL includes the application name and API version. The application name identifies the service. The API version is useful if there are new versions of the service, but you need to keep the old call. This may be necessary for testing or the API is different for different systems.
Next, create http://[hostname]/iris/api/v1.0/status/ to check the status of the request to the service and http://[hostname]/iris/api/v1.0/result/ to get the results models.
Let’s create a template of our service.
import os
from flask import Flask, jsonify, abort, make_response, request
import requests
import json
import time
import sys
import pandas as pd
app = Flask(__name__)
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
print(sepal_length, sepal_width, petal_length, petal_width, api)
if api == 'v1.0':
res_dict = {'Done': 'API exist'}
return res_dict
else:
res_dict = {'error': 'API doesnt exist'}
return res_dict
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
result = launch_task(request.args.get('sepal_length'), request.args.get('sepal_width'), \
request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0')
return make_response(jsonify(result), 200)
if __name__ == '__main__':
app.run(port=5000, debug=True)
The model is not used yet. I will note a few points – the get_task function uses the GET method and receives the features necessary for the model to work as input. The call to our service looks like this: http://[hostname]/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2
The launch_task function will already work with the model, but for now it simply checks the availability of the API version and prints the parameters passed to it to the console.
Let’s make sure everything works. Let’s run our application in the console:
python rest_api.py
Let’s turn in the browser at http://127.0.0.1:5000/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2 and see in the browser and console that everything works fine.
The template is ready, we will tune it.
Error handling
Notice the line return make_response (jsonify (result), 200). 200 is the HTTP status code “OK”.
In real life, errors can occur in the service, so let’s create error handling. We will handle two common errors: 404 – “Not Found” and 500 – “Internal Server Error”
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'code': 'PAGE_NOT_FOUND'}), 404)
@app.errorhandler(500)
def server_error(error):
return make_response(jsonify({'code': 'INTERNAL_SERVER_ERROR'}), 500)
Working with the model
Let’s create a model.py file to load the trained model.
import pickle
PATH_TO_MODELS = 'models/'
filename = 'model.pkl'
model = PATH_TO_MODELS + filename
def load_model():
loaded_model = pickle.load(open(model, 'rb'))
return loaded_model
Now it’s time to add the launch_task function to return the predictions made by the model.
model = M.load_model()
targets = ['setosa', 'versicolor', 'virginica']
def get_pred(sepal_length, sepal_width, petal_length, petal_width):
all_columns = ['sepal length', 'sepal width', 'petal length', 'petal width']
lst = [sepal_length, sepal_width, petal_length, petal_width]
df = pd.DataFrame([lst], columns = all_columns)
df = df.astype(float)
result = model.predict_proba(df)
predx = ['%.3f' % elem for elem in result[0]]
preds_concat = pd.concat([pd.Series(targets), pd.Series(predx)], axis=1)
preds = pd.DataFrame(data=preds_concat)
preds.columns = ["class", "probability"]
return preds.reset_index(drop=True)
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
pred_model = get_pred(sepal_length, sepal_width, petal_length, petal_width)
if api == 'v1.0':
res_dict = {'result': json.loads( pd.DataFrame(pred_model).to_json(orient='records'))}
return res_dict
else:
res_dict = {'error': 'API doesnt exist'}
return res_dict
Added model loading and get_pred function, which, based on the obtained feature values, returns a dataframe with the name of the Iris variety and the probability of belonging to the class. The launch_task now returns a serialized JSON response.
It would seem that the service is ready. Yes this is true. But let’s work on it some more
Logging
Let’s implement logging in a microservice. Key points and errors during the microservice operation will be recorded in the log. Logging is implemented using the logging library.
import logging
logging.basicConfig(filename='logs/logs.log',level=logging.DEBUG)
Next, in the necessary places of the service, put an entry in the log.
logging.debug('Error')
logging.info('Information message')
logging.warning('Warning')
The log looks like this
Redis based task queue
In our example, the model runs quickly. Imagine that the model is working with an image, video or text. The model will take a little more time to run. For example: 3-10 seconds. This means the client is waiting for the service response. Therefore, the execution must be made async. That is, the service does not wait for the process to complete, but continues to work independently. Unfortunately Flask does not support async work, so we will use the Python RQ tool. RQ stands for Redis Queue, the tool runs on Redis. Remember, RQ will not work under Windows.
How will it work? The client call the microservice, the service fixes the job_id, and the model processes the request in the background. You can find out about the status of the request at http://[hostname]/iris/api/v1.0/status/, using job_id. If the status is success, then at the address http://[hostname]/iris/api/v1.0/result, also by job_id get the result of the model execution.
from rq import Queue, get_current_job
from redis import Redis
redis_conn = Redis(host='app-redis', port=6379)
queue = Queue('rest_api', connection=redis_conn, default_timeout=1200)
The execution timeout is set using default_timeout. Here 1200 seconds are set to execute for tasks that fall into the queue with the name rest_api.
Let’s start the queue with the command
rq worker rest_api
For the microservice to work, run a sufficient number of workers to process. If there are few workers, tasks will be queued and executed as they become free.
Let’s modify our get_task function to launch launch_task using a queue.
def get_response(dict, status=200):
return make_response(jsonify(dict), status)
def get_job_response(job_id):
return get_response({'job_id': job_id})
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
job_id = request.args.get('job_id')
job = queue.enqueue('rest_api.launch_task', request.args.get('sepal_length'), request.args.get('sepal_width'), \
request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0', job_id, result_ttl=60 * 60 * 24, \
job_id=job_id)
return get_job_response(job.get_id())
Note that now another additional parameter is passed to launch_task – job_id. The result_ttl parameter is responsible for the storage period of the result. The value is transmitted in seconds. In the example, the shelf life is one day.
Now, when starting the microservice in the browser, the job_id is returned.
Now let’s implement a check of the execution status of the model. JSON will be returned in the following format:
- code — answer code. 404 — OT_FOUND, PAGE_NOT_FOUND. 505 — INTERNAL_SERVER_ERROR. 200 — READY. 202 — NOT_READY
- status — success/error/running
def get_process_response(code, process_status, status=200):
return get_response({
'code': code,
'status': process_status
}, status)
@app.route('/iris/api/status/<id>')
def status(id):
job = queue.fetch_job(id)
if (job is None):
return get_process_response('NOT_FOUND', 'error', 404)
if (job.is_failed):
return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
if (job.is_finished):
return get_process_response('READY', 'success')
return get_process_response('NOT_READY', 'running', 202)
Using job_id you can find out the execution status of the model. For example: http://[hostname]/iris/api/v1.0/status/[job_id]
It remains to implement this to obtain the result of the model. If processing is not completed, the function returns Not ready, 202. If processed, it returns JSON with information about the name of the Iris variety and the probability of belonging to the class.
@app.route('/iris/api/result/<id>')
def result(id):
job = queue.fetch_job(id)
if job is None:
return get_process_response('NOT_FOUND', 'error', 404)
if job.is_failed:
return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)
if job.is_finished:
job_result = copy.deepcopy(job.result)
result = {
'result': job_result['result']
}
return get_response(result)
return get_process_response('NOT_FOUND', 'error', 404)
In the same way, using job_id, we get the result of the model execution. For example: http://[hostname]/iris/api/v1.0/result/[job_id]
The microservice is now ready for integration with the IT infrastructure.
Integration with IT infrastructure
Let’s generate a requirements.txt file based on the modules and packages that you import into your project. You can of course do it manually. Better yet, we’ll use pipreqs to automatically generate Python dependencies.
Install pipreqs
pip install pipreqs
Launch
pipreqs /<your_project_path>/
Done
Unfortunately, you will have to change the file manually. Let’s add gunicorn there, which will come in handy. But more on that later. add scikit_learn to make the model work. The scikit_learn version should be the one the model is trained on. You can check the version like this:
import sklearn
print('The scikit-learn version is {}.'.format(sklearn.__version__))
The content of the file looks like this:
Now let’s package the entire microservice in Docker. Docker is software for automating the deployment and management of applications in containerized environments.
Why Docker? The main advantage is fast deployment. Docker creates a container for each process and does not boot the OS. Everything happens in seconds.
Insulation and safety. With Docker, resources are isolated and shared. You can not be afraid to remove containers, the removal will be complete and clean. Only the assigned resources are used.
Another advantage is the de facto standardization of the approach. Almost all large companies build their infrastructure using Docker. Standardization reduces the amount of time spent on defects and increases the amount of time available to develop features.
Create Dockerfile:
FROM python:3.7-buster
RUN apt-get update -y
WORKDIR /usr/src/app
ENV LANG C.UTF-8
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
Let’s analyze each line:
- FROM python:3.7-buster — base image
- RUN apt-get update -y — Update information about repositories inside the container
- WORKDIR /usr/src/app — Change the working directory inside the container. The commands will then be run inside the /usr/src/ app directory inside the container
- ENV LANG C.UTF-8 — Set the locale inside the container
- COPY requirements.txt ./ — copy our file with dependencies
- RUN pip install -r requirements.txt — Install dependencies saved in requirements.txt
- COPY . . — copy the new code to the container file system
Now let’s create a docker-compose.yml file to define a set of services.
version: '3'
services:
iris:
build: .
image: iris:1.0
container_name: iris
ports:
- 5000:5000
extra_hosts:
- "app-redis:[your IP]"
command: /usr/src/app/start.sh
What you should pay attention to here: command – launches the start.sh file. About it a little later. extra_hosts adds hostname mappings to work with Redis. If you want to test locally, then specify the IP of your computer.
Go to file start.sh
#!/bin/bash
run_rq() {
rq worker rest_api -u 'redis://app-redis:6379' 2>&1 | tee -a &
}
run_gunicorn() {
gunicorn rest_api:app -b 0.0.0.0:5000 --workers=2 2>&1 | tee -a
}
run_rq
run_gunicorn
This script starts the already familiar queue worker and starts our microservice using Gunicorn. Gunicorn is a WSGI server built for use on UNIX systems. This server is relatively fast, resource intensive, easy to start, and works with a wide variety of web frameworks.
Let’s test, run the command to create the container
docker-compose build
Launch
docker-compose up
Running and ready to test.
Docker was launched to test the health of the container. In production, do it right away:
docker-compose up -d --build --force-recreate
The -d flag is for running the container in the background.
Conclusion
In this note, I managed to consider a large stack of technologies. We got acquainted with the microservice architecture, created a template for the Flask microservice, logged the operation of the service and got acquainted with the task queue based on Redis. We separately considered the integration of the solution into the IT infrastructure using Docker.
This note is not intended to be complete, but it allows you to quickly create microservices from this template. This template is suitable both for working with tabular data and for computer vision tasks.
Link to the repository with the code from the note
Read more my posts