tree: feef427add5c954ccdc9ca1ac19b8ed094af2cb2 [path history] [tgz]
  1. plugins/
  2. pydevlake/
  3. test/
  4. build.sh
  5. README.md
  6. requirements.txt
  7. run_tests.sh
backend/python/README.md

Pydevlake

Pydevlake is a framework for writing plugins plugins for DevLake. The framework source code can be found in here.

How to create a new plugin

Create the plugin project

Make sure you have Poetry installed. Move to python/plugins and execute poetry new myplugin. This will generate a new directory for your plugin.

In the pyproject.toml file and add the following line at the end of the [tool.poetry.dependencies] section:

pydevlake = { path = "../../pydevlake", develop = true }

Now run poetry install.

Create main file

Create a main.py file with the following content:

from typing import Iterable

import pydevlake as dl


class MyPluginConnection(dl.Connection):
    pass


class MyPluginTransformationRule(dl.TransformationRule):
    pass


class MyPluginToolScope(dl.ToolScope):
    pass


class MyPlugin(dl.Plugin):
    connection_type = MyPluginConnection
    transformation_rule_type =  MyPluginTransformationRule
    tool_scope_type = MyPluginToolScope
    streams = []

    def domain_scopes(self, tool_scope: MyScope) -> Iterable[dl.DomainScope]:
        ...

    def remote_scope_groups(self, connection: MyPluginConnection) -> Iterable[dl.RemoteScopeGroup]:
        ...

    def remote_scopes(self, connection, group_id: str) -> Iterable[MyPluginToolScope]:
        ...

    def test_connection(self, connection: MyPluginConnection):
        ...


if __name__ == '__main__':
    MyPlugin.start()

This file is the entry point to your plugin. It specifies three datatypes:

  • A connection that groups the parameters that your plugin needs to collect data, e.g. the url and credentials to connect to the datasource
  • A transformation rule that groups the parameters that your plugin uses to convert some data, e.g. regexes to match issue type from name.
  • A tool layer scope type that represents the top-level entity of this plugin, e.g. a board, a repository, a project, etc.

The plugin class declares what are its connection, transformation rule and tool scope types. It also declares its list of streams, and is responsible to define 4 methods that we'll cover hereafter.

We also need to create two shell scripts in the plugin root directory to build and run the plugin. Create a build.sh file with the following content:

#!/bin/bash

cd "$(dirname "$0")"
poetry install

And a run.sh file with the following content:

#!/bin/bash

cd "$(dirname "$0")"
poetry run python myplugin/main.py "$@"

Connection parameters

The parameters of your plugin split between those that are required to connect to the datasource that are grouped in your connection class and those that are used to customize conversion to domain models that are grouped in your transformation rule class. For example, to add url and token parameter, edit MyPluginConnection as follow:

class MyPluginConnection(Connection):
    url: str
    token: str

All plugin methods that have a connection parameter will be called with an instance of this class. Note that you should not define __init__.

Transformation rule parameters

Transformation rules are used to customize the conversion of data from the tool layer to the domain layer. For example, you can define a regex to match issue type from issue name.

class MyPluginTransformationRule(TransformationRule):
    issue_type_regex: str

Not all plugins need transformation rules, so you can omit this class.

Tool scope type

The tool scope type is the top-level entity type of your plugin. For example, a board, a repository, a project, etc. A scope is connected to a connection, and all other collected entities are related to a scope. For example, a plugin for Jira will have a tool scope type of Board, and all other entities, such as issues, will belong to a single board.

Implement domain_scopes method

The domain_scopes method should return the list of domain scopes that are related to a given tool scope. Usually, this consists of a single domain scope, but it can be more than one for plugins that collect data from multiple domains.

from pydevlake.domain_layer.devops import CicdScope
...

class MyPlugin(dl.Plugin):
    ...

    def domain_scopes(self, tool_scope: MyPluginToolScope) -> list[dl.DomainScope]:
        yield CicdScope(
            name=tool_scope.name,
            description=tool_scope.description,
            url=tool_scope.url,
        )

Implement remote_scope and remote_scope_groups method

Those two methods are used by DevLake to list the available scopes in the datasource. The remote_scope_groups method should return a list of scope “groups” and the remote_scopes method should return the list of tool scopes in a given group.

class MyPlugin(dl.Plugin):
    ...

    def remote_scope_groups(self, connection: MyPluginConnection) -> Iterable[dl.RemoteScopeGroup]:
        api = ...
        response = ...
        for raw_group in response:
            yield RemoteScopeGroup(
                id=raw_group.id,
                name=raw_group.name,
            )

    def remote_scopes(self, connection, group_id: str) -> Iterable[MyPluginToolScope]:
        api = ...
        response = ...
        for raw_scope in response:
            yield MyPluginToolScope(
                id=raw_scope['id'],
                name=raw_scope['name'],
                description=raw_scope['description'],
                url=raw_scope['url'],
            )

Implement test_connection method

The test_connection method is used to test if a given connection is valid. It should check that the connection credentials are valid. If the connection is not valid, it should raise an exception.

class MyPlugin(dl.Plugin):
    ...

    def test_connection(self, connection: MyPluginConnection):
        api = ...
        response = ...
        if response.status_code != 401:
            raise Exception("Invalid credentials")
        if response.status_code != 200:
            raise Exception(f"Connection error {response}")

Add a new data stream

A data stream groups the logic for:

  • collecting the raw data from the datasource
  • extracting this raw data into a tool-specific model
  • converting the tool model into an equivalent DevLake domain model

Create a tool model

Create a models.py file. Then create a class that modelizes the data your stream is going to collect.

from pydevlake.model import ToolModel

class User(ToolModel, table=True):
    id: str = Field(primary_key=True)
    name: str
    email: str

Your tool model must declare at least one attribute as a primary key, like id in the example above. It must inherit from ToolModel, which in turn inherit from SQLModel, the base class of an ORM of the same name. You can use SQLModel features like declaring relationships with other models. Do not forget table=True, otherwise no table will be created in the database. You can omit it for abstract model classes.

To facilitate or even eliminate extraction, your tool models should be close to the raw data you collect. Note that if you collect data from a JSON REST API that uses camelCased properties, you can still define snake_cased attributes in your model. The camelCased attributes aliases will be generated, so no special care is needed during extraction.

Create the stream class

Create a new file for your first stream in a streams directory.

from pydevlake import Stream, DomainType
import pydevlake.domain_layer.crossdomain as cross

from myplugin.models import User


class Users(Stream):
    tool_model = ToolUser
    domain_models = [cross.User]

    def collect(self, state, context) -> Iterable[Tuple[object, dict]]:
        pass

    def extract(self, raw_data) -> ToolUser:
        pass

    def convert(self, user: ToolUser, context) -> Iterable[DomainUser]:
        pass

This stream will collect raw user data, e.g. as parsed JSON objects, extract this raw data as your tool-specific user model, then convert them into domain-layer user models.

The tool_model class attribute declares the tool model class that is extracted by this stream. The domain_domain class attribute is a list of domain models that are converted from the tool model. Most of the time, you will convert a tool model into a single domain model, but need to convert it into multiple domain models.

The collect method takes a state dictionary and a context object and yields tuples of raw data and new state. The last state that the plugin yielded for a given connection will be reused during the next collection. The plugin can use this state to store information necessary to perform incremental collection of data.

The extract method takes a raw data object and returns a tool model. This method has a default implementation that populates an instance of the tool_model class with the raw data. When you need to extract a nested value from JSON raw data, you can specify a JSON pointer (see RFC 6901) in the as source argument to a Field declaration.

class User(ToolModel, table=True):
    id: str = Field(primary_key=True)
    name: str
    email: str
    address: str = Field(source="/contactInfo/address")

Here the address field will be populated with the value of the address property of the contactInfo object property of the JSON object.

The convert method takes a tool-specific user model and convert it into domain level user models. Here the two models align quite well, the conversion is trivial:

def convert(self, user: ToolUser, context: Context) -> Iterable[DomainUser]:
    yield DomainUser(
        id=user.id,
        name=user.name
        email=user.email
    )

Substreams

Sometimes, a datasource is organized hierarchically. For example, in Jira an issue have many comments. In this case, you can create a substream to collect the comments of an issue. A substream is a stream that is executed for each element of a parent stream. The parent tool model, in our example an issue, is passed to the substream collect method as the parent argument.

import pydevlake as dl
import pydevlake.domain_layer.ticket as ticket

from myplugin.streams.issues import Issues

class Comments(dl.Substream):
    tool_model = IssueComment
    domain_models = [ticket.IssueComment]
    parent_stream = Issues

    def collect(self, state, context, parent: Issue) -> Iterable[Tuple[object, dict]]:
        ...

Create an API wrapper

Lets assume that your datasource is a REST API. We can create the following class to define it.

from pydevlake.api import API


class MyAPI(API):
    def __init__(self, url: str):
        self.url = url

    def users(self):
        return self.get(f'{self.url}/users')

By inheriting API you get access to facilities to wrap REST APIs. Here the users method will return a Response object that contains the results of calling GET on <url>/users.

Now we can go back to our stream file and implement collect:

from myplugin.api import MyAPI

...

    def collect(self, state, context) -> Iterable[Tuple[object, dict]]:
        api = MyAPI(context.connection.url)
        for user in api.users().json():
            yield user, state

...

If the API responds with a list of JSON object with properties matching your User model attributes, you're done!. Indeed extraction has a default implementation that takes of this common case. This is why it is important to make tool models that align with the data you collect.

If this is not the case, e.g. the attribute case mismatch, you can redefine the extract method:

...

class Users(Stream):
    ...

    def extract(self, raw_data: dict) -> ToolModel:
        return ToolUser(
            id=raw_data["ID"],
            name=raw_data["Name"],
            email=raw_data["Email"]
        )

    ...

Request and response hook

For each request sent and response received by your API wrapper, you can register hooks. Hooks allows you to implement authentication, pagination, and generic API error handling.

For example, lets assume that we are dealing with an API that require user to authenticate via a token set in a request header.

A request hook is declared in your API with a @request_hook decorator.

...
class MyAPI(API):
    def __init__(self, url, token):
        self.url = url
        self.token = token

    ...
    @request_hook
    def authenticate(self, request):
        if self.token:
            request.headers['Token'] = self.token
    ...

Here the method authenticate is a hook that is run on each request. Similarly you can declare response hooks with @response_hook. Multiple hooks are executed in the order of their declaration. The API base class declares some hooks that are executed first.

Pagination

One usage of a response hook is for handling paginated results. A response hook can be used to wrap the Response object in a PagedResponse object that support iteration and fetching other pages. This response hook is actually defined in API base class and expect your API wrapper to declare a paginator property.

You can subclass Paginator to provide API specific logic or reuse an existing implementation such as TokenPaginator. A token paginator assumes the API paginated responses are JSON object with one property that is an array of items and another that contains the token to the next page.

For example, the following paginator fetch items from the 'results' attribute, the next page from the 'nextPage' attribute and will issue requests with a page query parameter.

...
class MyAPI(API):
    ...
    paginator = TokenPaginator(
        items_attr='results',
        next_page_token_attr='nextPage',
        next_page_token_param='page'
    )
    ...

Substreams

With REST APIs, you often need to fetch a stream of items, and then to collect additional data for each of those items.

For example you might want to collect all UserComments written by each user collected via the Users stream.

To handle such cases, UserComments would inherit from Substream instead of Stream. A substream needs to specify which is his parent stream. The collect method of a substream will be called with each item collected from the parent stream.

...
from pydevlake import Substream
from myplugin.streams.users import Users

class UserComments(Substream):
    parent_stream = Users # Must specify the parent stream
    ...
    def collect(self, state: dict, context, user: User):
        """
        This method will be called for each user collected from parent stream Users.
        """
        for json in MyPluginAPI(context.connection.token).user_comments(user.id):
            yield json, state
    ...

Test the plugin standalone

To test your plugin manually, you can run your main.py file with different commands. You can find all those commands with --help cli flag:

poetry run myplugin/main.py --help

For testing, the interesting commands are collect/extract/convert. Each takes a context and a stream name. The context is a JSON object that must at least contain:

  • a db_url, e.g. you can use "sqlite+pysqlite:///:memory:" for an in-memory DB
  • a connection object containing the same attributes than your plugin connection type

Also, python plugins communicate with go side over an extra file descriptor 3, so you should redirect to stdout when testing your plugin.

console
CTX='{"db_url":"sqlite+pysqlite:///:memory:", "connection": {...your connection attrs here...}}'
poetry run myplugin/main.py $CTX users 3>&1

Automated tests

Make sure you have unit-tests written for your plugin code. The test files should end with _test.py, and are discovered and executed by the run_tests.sh script by the CICD automation. The test files should be placed inside the plugin project directory.

Debugging Python plugins

You need to have a Python remote-debugger installed to debug the Python code. This capability is controlled by the environment variable USE_PYTHON_DEBUGGER which is empty by default. The allowed debuggers as of now are:

  • pycharm

You will further have to set the environment variables PYTHON_DEBUG_HOST (The hostname/IP on which your debugger is running relative to the environment in which the plugin is running) and PYTHON_DEBUG_PORT (The corresponding debugger port). The variables should be set in the Go integration tests written in backend/test/integration/remote or Docker container/server env configuration. Once done, set breakpoints in the Python plugin code in your IDE, turn on the debugger in it, and those breakpoints should get hit.