Data Science, ML and Analytics Engineering

Machine Learning Models in production: Flask and REST API

A trained machine learning model alone will not add value for business. The model must be integrated into the company’s IT infrastructure. Let’s develope REST API microservice to classify Iris flowers. The dataset consists of the length and width of two types of Iris petals: sepal and petal. The target variable is Iris variety: 0 – Setosa, 1 – Versicolor, 2 – Virginica.

Saving and loading a model

Before moving on to develope API, we need to train and save the model. Take the RandomForestClassifier model. Now let’s save the model to a file and load it to make predictions. This can be done with pickle or joblib.

import pickle filename = 'model.pkl'
pickle.dump(clf, open(filename, 'wb'))

We’ll use pickle.load to load and validate the model.

loaded_model = pickle.load(open(filename, 'rb'))
result = loaded_model.score(X_test, y_test) 
print(result)

The code for training, saving and loading the model is available in the repository — link

What is REST API and microservices

Microservice is a web service that decides one business need and can interact with other services in the IT infrastructure using, for example, HTTP. An architecture consisting of several microservices is called a microservice architecture.

REST (Representational State Transfer) – the principles of organizing the interaction of services through the HTTP protocol. Clients send requests using the methods provided by the HTTP protocol and perform the operation. For example: getting, adding, modifying or deleting data.

API (Application Programming Interface) — interface for communication between services.

Microservice design

Let’s move on to practice to make it clearer. We will design our service. Service structure:

  • rest_api.py — Flask application that interacts with the client and returns a model prediction
  • model.py — file with functions for loading models
  • models/ — folder with saved models
  • logs/ —folder with logs

The API will be accessed at the following URL— http://[hostname]/iris/api/v1.0/getpred

The URL includes the application name and API version. The application name identifies the service. The API version is useful if there are new versions of the service, but you need to keep the old call. This may be necessary for testing or the API is different for different systems.

Next, create http://[hostname]/iris/api/v1.0/status/ to check the status of the request to the service and http://[hostname]/iris/api/v1.0/result/ to get the results models.

Let’s create a template of our service.

import os
 
from flask import Flask, jsonify, abort, make_response, request
import requests
import json
import time
import sys
import pandas as pd
 
app = Flask(__name__)
 
def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
    
    print(sepal_length, sepal_width, petal_length, petal_width, api)
    if api == 'v1.0':
        res_dict = {'Done': 'API exist'}
        return res_dict
    else:
        res_dict = {'error': 'API doesnt exist'}
        return res_dict
 
@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
    result = launch_task(request.args.get('sepal_length'), request.args.get('sepal_width'), \
                        request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0')
	
    return make_response(jsonify(result), 200)
 
if __name__ == '__main__':
    app.run(port=5000, debug=True)

The model is not used yet. I will note a few points – the get_task function uses the GET method and receives the features necessary for the model to work as input. The call to our service looks like this: http://[hostname]/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2

The launch_task function will already work with the model, but for now it simply checks the availability of the API version and prints the parameters passed to it to the console.

Let’s make sure everything works. Let’s run our application in the console:

python rest_api.py

Let’s turn in the browser at http://127.0.0.1:5000/iris/api/v1.0/getpred?sepal_length=5.1&sepal_width=3.5&petal_length=1.4&petal_width=0.2 and see in the browser and console that everything works fine.

Testing our API
Testing our API
Testing our API in the console
Testing our API in the console

The template is ready, we will tune it.

Error handling

Notice the line return make_response (jsonify (result), 200). 200 is the HTTP status code “OK”.

In real life, errors can occur in the service, so let’s create error handling. We will handle two common errors: 404 – “Not Found” and 500 – “Internal Server Error”

@app.errorhandler(404)
def not_found(error):
    return make_response(jsonify({'code': 'PAGE_NOT_FOUND'}), 404)

@app.errorhandler(500)
def server_error(error):
    return make_response(jsonify({'code': 'INTERNAL_SERVER_ERROR'}), 500)

Working with the model

Let’s create a model.py file to load the trained model.

import pickle

PATH_TO_MODELS = 'models/'
filename = 'model.pkl'

model = PATH_TO_MODELS + filename

def load_model():
    loaded_model = pickle.load(open(model, 'rb'))
    return loaded_model

Now it’s time to add the launch_task function to return the predictions made by the model.

model = M.load_model()
targets = ['setosa', 'versicolor', 'virginica']

def get_pred(sepal_length, sepal_width, petal_length, petal_width):
    
    all_columns = ['sepal length', 'sepal width', 'petal length', 'petal width']
    lst = [sepal_length, sepal_width, petal_length, petal_width]
    df = pd.DataFrame([lst], columns = all_columns)
    
    df = df.astype(float)
    result = model.predict_proba(df)
    predx = ['%.3f' % elem for elem in result[0]]
    preds_concat = pd.concat([pd.Series(targets), pd.Series(predx)], axis=1)
    preds = pd.DataFrame(data=preds_concat)
    preds.columns = ["class", "probability"]
    return preds.reset_index(drop=True)

def launch_task(sepal_length, sepal_width, petal_length, petal_width, api):
    
    pred_model = get_pred(sepal_length, sepal_width, petal_length, petal_width)

    if api == 'v1.0':
        res_dict = {'result':  json.loads( pd.DataFrame(pred_model).to_json(orient='records'))}
        return res_dict
    else:
        res_dict = {'error': 'API doesnt exist'}
        return res_dict

Added model loading and get_pred function, which, based on the obtained feature values, returns a dataframe with the name of the Iris variety and the probability of belonging to the class. The launch_task now returns a serialized JSON response.

Serialized JSON response
Serialized JSON response

It would seem that the service is ready. Yes this is true. But let’s work on it some more

Logging

Let’s implement logging in a microservice. Key points and errors during the microservice operation will be recorded in the log. Logging is implemented using the logging library.

import logging
logging.basicConfig(filename='logs/logs.log',level=logging.DEBUG)

Next, in the necessary places of the service, put an entry in the log.

logging.debug('Error')
logging.info('Information message')
logging.warning('Warning')

The log looks like this

Log
Log

Redis based task queue

In our example, the model runs quickly. Imagine that the model is working with an image, video or text. The model will take a little more time to run. For example: 3-10 seconds. This means the client is waiting for the service response. Therefore, the execution must be made async. That is, the service does not wait for the process to complete, but continues to work independently. Unfortunately Flask does not support async work, so we will use the Python RQ tool. RQ stands for Redis Queue, the tool runs on Redis. Remember, RQ will not work under Windows.

How will it work? The client call the microservice, the service fixes the job_id, and the model processes the request in the background. You can find out about the status of the request at http://[hostname]/iris/api/v1.0/status/, using job_id. If the status is success, then at the address http://[hostname]/iris/api/v1.0/result, also by job_id get the result of the model execution.

from rq import Queue, get_current_job
from redis import Redis
redis_conn = Redis(host='app-redis', port=6379)
queue = Queue('rest_api', connection=redis_conn, default_timeout=1200)

The execution timeout is set using default_timeout. Here 1200 seconds are set to execute for tasks that fall into the queue with the name rest_api.

Let’s start the queue with the command

rq worker rest_api

For the microservice to work, run a sufficient number of workers to process. If there are few workers, tasks will be queued and executed as they become free.

Let’s modify our get_task function to launch launch_task using a queue.

def get_response(dict, status=200):
    return make_response(jsonify(dict), status)

def get_job_response(job_id):
    return get_response({'job_id': job_id})

@app.route('/iris/api/v1.0/getpred', methods=['GET'])
def get_task():
    job_id = request.args.get('job_id')
    job = queue.enqueue('rest_api.launch_task', request.args.get('sepal_length'), request.args.get('sepal_width'), \
                         request.args.get('petal_length'), request.args.get('petal_width'), 'v1.0', job_id, result_ttl=60 * 60 * 24, \
                        job_id=job_id)

    return get_job_response(job.get_id())

Note that now another additional parameter is passed to launch_task – job_id. The result_ttl parameter is responsible for the storage period of the result. The value is transmitted in seconds. In the example, the shelf life is one day.

Now, when starting the microservice in the browser, the job_id is returned.

job_id when starting the service
job_id when starting the service

Now let’s implement a check of the execution status of the model. JSON will be returned in the following format:

  • code — answer code. 404 — OT_FOUND, PAGE_NOT_FOUND. 505 — INTERNAL_SERVER_ERROR. 200 — READY. 202 — NOT_READY
  • status — success/error/running
def get_process_response(code, process_status, status=200):
    return get_response({
        'code': code,
        'status': process_status
    }, status)

@app.route('/iris/api/status/<id>')
def status(id):
    job = queue.fetch_job(id)

    if (job is None):
        return get_process_response('NOT_FOUND', 'error', 404)

    if (job.is_failed):
        return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)

    if (job.is_finished):
        return get_process_response('READY', 'success')

    return get_process_response('NOT_READY', 'running', 202)

Using job_id you can find out the execution status of the model. For example: http://[hostname]/iris/api/v1.0/status/[job_id]

Check status
Check status

It remains to implement this to obtain the result of the model. If processing is not completed, the function returns Not ready, 202. If processed, it returns JSON with information about the name of the Iris variety and the probability of belonging to the class.

@app.route('/iris/api/result/<id>')
def result(id):
    job = queue.fetch_job(id)
    
    if job is None:
        return get_process_response('NOT_FOUND', 'error', 404)

    if job.is_failed:
        return get_process_response('INTERNAL_SERVER_ERROR', 'error', 500)

    if job.is_finished:
        job_result = copy.deepcopy(job.result)
        result = {
            'result': job_result['result']
        }

        return get_response(result)

    return get_process_response('NOT_FOUND', 'error', 404)

In the same way, using job_id, we get the result of the model execution. For example: http://[hostname]/iris/api/v1.0/result/[job_id]

The result of the microservice
The result of the microservice

The microservice is now ready for integration with the IT infrastructure.

Integration with IT infrastructure

Let’s generate a requirements.txt file based on the modules and packages that you import into your project. You can of course do it manually. Better yet, we’ll use pipreqs to automatically generate Python dependencies.

Install pipreqs

pip install pipreqs

Launch

pipreqs /<your_project_path>/

Done

The result of pipreqs
The result of pipreqs

Unfortunately, you will have to change the file manually. Let’s add gunicorn there, which will come in handy. But more on that later. add scikit_learn to make the model work. The scikit_learn version should be the one the model is trained on. You can check the version like this:

import sklearn

print('The scikit-learn version is {}.'.format(sklearn.__version__))

The content of the file looks like this:

Contents of the requirements.txt file
Contents of the requirements.txt file

Now let’s package the entire microservice in Docker. Docker is software for automating the deployment and management of applications in containerized environments.

Why Docker? The main advantage is fast deployment. Docker creates a container for each process and does not boot the OS. Everything happens in seconds.

Insulation and safety. With Docker, resources are isolated and shared. You can not be afraid to remove containers, the removal will be complete and clean. Only the assigned resources are used.

Another advantage is the de facto standardization of the approach. Almost all large companies build their infrastructure using Docker. Standardization reduces the amount of time spent on defects and increases the amount of time available to develop features.

Create Dockerfile:

FROM python:3.7-buster

RUN apt-get update -y

WORKDIR /usr/src/app

ENV LANG C.UTF-8

COPY requirements.txt ./
RUN pip install -r requirements.txt

COPY . .

Let’s analyze each line:

  • FROM python:3.7-buster — base image
  • RUN apt-get update -y — Update information about repositories inside the container
  • WORKDIR /usr/src/app — Change the working directory inside the container. The commands will then be run inside the /usr/src/ app directory inside the container
  • ENV LANG C.UTF-8 — Set the locale inside the container
  • COPY requirements.txt ./ — copy our file with dependencies
  • RUN pip install -r requirements.txt — Install dependencies saved in requirements.txt
  • COPY . . — copy the new code to the container file system

Now let’s create a docker-compose.yml file to define a set of services.

version: '3'
services:
  iris:
    build: .
    image: iris:1.0
    container_name: iris
    ports:
      - 5000:5000
    extra_hosts:
        - "app-redis:[your IP]"
    command: /usr/src/app/start.sh

What you should pay attention to here: command – launches the start.sh file. About it a little later. extra_hosts adds hostname mappings to work with Redis. If you want to test locally, then specify the IP of your computer.

Go to file start.sh

#!/bin/bash

run_rq() {
  rq worker rest_api -u 'redis://app-redis:6379' 2>&1 | tee -a &
}

run_gunicorn() {
  gunicorn rest_api:app -b 0.0.0.0:5000 --workers=2 2>&1 | tee -a 
}

run_rq
run_gunicorn

This script starts the already familiar queue worker and starts our microservice using Gunicorn. Gunicorn is a WSGI server built for use on UNIX systems. This server is relatively fast, resource intensive, easy to start, and works with a wide variety of web frameworks.

Let’s test, run the command to create the container

docker-compose build

Launch

docker-compose up

Running and ready to test.

Docker container testing
Docker container testing

Docker was launched to test the health of the container. In production, do it right away:

docker-compose up -d --build --force-recreate

The -d flag is for running the container in the background.

Conclusion

In this note, I managed to consider a large stack of technologies. We got acquainted with the microservice architecture, created a template for the Flask microservice, logged the operation of the service and got acquainted with the task queue based on Redis. We separately considered the integration of the solution into the IT infrastructure using Docker.

This note is not intended to be complete, but it allows you to quickly create microservices from this template. This template is suitable both for working with tabular data and for computer vision tasks.

Link to the repository with the code from the note

Read more my posts

Share it

If you liked the article - subscribe to my channel in the telegram https://t.me/renat_alimbekov


Other entries in this category: