Add WorkerResourceMixin better way to set CPU quotas and max memory (#110)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index ab8df5e..4164d78 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -23,10 +23,10 @@
 Workflow
 --------
 
-Workflow describe the whole things except `tasks`_ and `tasks dependence`_, which including
+Workflow describes the whole things except `tasks`_ and `tasks dependence`_, which includes
 name, schedule interval, schedule start time and end time. You would know scheduler 
 
-Workflow could be initialized in normal assign statement or in context manger.
+Workflow could be initialized in a normal assignment statement or within a context manger.
 
 .. code-block:: python
 
@@ -37,10 +37,10 @@
    with Workflow(name="my first workflow") as workflow:
        workflow.submit()
 
-Workflow is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
-After workflow and task is be declared, you could use `submit` and `run` notify server your definition.
+Workflow is the main object communicating between *PyDolphinScheduler* and DolphinScheduler daemon.
+After workflow and task is declared, you could use `submit` and `run` to notify server your definition.
 
-If you just want to submit your definition and create workflow, without run it, you should use attribute `submit`.
+If you just want to submit your definition and create workflow, without running it, you should use attribute `submit`.
 But if you want to run the workflow after you submit it, you could use attribute `run`.
 
 .. code-block:: python
@@ -54,8 +54,8 @@
 Schedule
 ~~~~~~~~
 
-We use parameter `schedule` determine the schedule interval of workflow, *PyDolphinScheduler* support seven
-asterisks expression, and each of the meaning of position as below
+We use parameter `schedule` to determine the schedule interval of workflow, *PyDolphinScheduler* supports seven
+asterisks expression, and each of the meaning of position is as below
 
 .. code-block:: text
 
@@ -136,8 +136,8 @@
 Tasks
 -----
 
-Task is the minimum unit running actual job, and it is nodes of DAG, aka directed acyclic graph. You could define
-what you want to in the task. It have some required parameter to make uniqueness and definition.
+Task is the minimum unit running actual job, and it is a node of DAG, aka directed acyclic graph. You could define
+what you want in the task. It has some required parameters to make uniqueness and definition.
 
 Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as example, parameter `name` and `command` is required and must be provider. Parameter
 `name` set name to the task, and parameter `command` declare the command you wish to run in this task.
@@ -147,15 +147,15 @@
    # We named this task as "shell", and just run command `echo shell task`
    shell_task = Shell(name="shell", command="echo shell task")
 
-If you want to see all type of tasks, you could see :doc:`tasks/index`.
+If you want to see all types of tasks, you could see :doc:`tasks/index`.
 
 Tasks Dependence
 ~~~~~~~~~~~~~~~~
 
-You could define many tasks in on single `Workflow`_. If all those task is in parallel processing,
-then you could leave them alone without adding any additional information. But if there have some tasks should
-not be run unless pre task in workflow have be done, we should set task dependence to them. Set tasks dependence
-have two mainly way and both of them is easy. You could use bitwise operator `>>` and `<<`, or task attribute 
+You could define many tasks in on single `Workflow`_. If all those tasks are in parallel processing,
+then you could leave them alone without adding any additional information. But if there are some tasks that should
+not be run unless pre task in workflow has been done, we should set task dependence to them. Set task dependence
+have two main ways and both of them are easy. You could use bitwise operator `>>` and `<<`, or task attribute 
 `set_downstream` and `set_upstream` to do it.
 
 .. code-block:: python
@@ -178,7 +178,7 @@
 Task With Workflow
 ~~~~~~~~~~~~~~~~~~
 
-In most of data orchestration cases, you should assigned attribute `workflow` to task instance to
+In most of data orchestration cases, you should assign attribute `workflow` to task instance to
 decide workflow of task. You could set `workflow` in both normal assign or in context manger mode
 
 .. code-block:: python
@@ -232,13 +232,13 @@
 --------------
 
 During workflow running, we may need some resource files to help us run task usually. One of a common situation
-is that we already have some executable files locally, and we need to schedule in specific time, or add them
-to existing workflow by adding the new tasks. Of cause, we can upload those files to target machine and run them
+is that we already have some executable files locally, and we need to schedule a specific time, or add them
+to existing workflow by adding the new tasks. Of course, we can upload those files to target machine and run them
 in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But if we have more than one machine
 to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because
 we may need to change our resource files sometimes.
 
-The more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run.
+One more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run.
 For example, you have a bash script named ``echo-ten.sh`` locally, and it contains some code like this:
 
 .. code-block:: bash
diff --git a/docs/source/start.rst b/docs/source/start.rst
index 434d80e..3c41471 100644
--- a/docs/source/start.rst
+++ b/docs/source/start.rst
@@ -18,7 +18,7 @@
 Getting Started
 ===============
 
-To get started with *PyDolphinScheduler* you must ensure python and pip
+To get started with *PyDolphinScheduler* you must ensure python and pip are
 installed on your machine, if you're already set up, you can skip straight
 to `Installing PyDolphinScheduler`_, otherwise please continue with
 `Installing Python`_.
@@ -28,16 +28,16 @@
 
 How to install `python` and `pip` depends on what operating system
 you're using. The python wiki provides up to date
-`instructions for all platforms here`_. When you entering the website
-and choice your operating system, you would be offered the choice and
-select python version. *PyDolphinScheduler* recommend use version above
-Python 3.6 and we highly recommend you install *Stable Releases* instead
+`instructions for all platforms here`_. When you enter the website
+and choose your operating system, you would be offered the choice and
+select python version. *PyDolphinScheduler* recommends using a version above
+Python 3.6 and we highly recommend installing *Stable Releases* instead
 of *Pre-releases*.
 
 After you have download and installed Python, you should open your terminal,
-typing and running :code:`python --version` to check whether the installation
-is correct or not. If all thing good, you could see the version in console
-without error(here is a example after Python 3.8.7 installed)
+type and run :code:`python --version` to check whether the installation
+is correct or not. If everything is good, you could see the version in console
+without error(here is an example after Python 3.8.7 is installed)
 
 .. code-block:: bash
 
@@ -49,21 +49,21 @@
 -----------------------------
 
 After Python is already installed on your machine following section
-`installing Python`_, it easy to *PyDolphinScheduler* by pip.
+`installing Python`_, it is easy to install *PyDolphinScheduler* using pip.
 
 .. code-block:: bash
 
    python -m pip install apache-dolphinscheduler
 
-The latest version of *PyDolphinScheduler* would be installed after you run above
+The latest version of *PyDolphinScheduler* would be installed after you run the above
 command in your terminal. You could go and `start Python Gateway Service`_ to finish
-the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
+the preparation, and then go to :doc:`tutorial` to get your hand dirty. But if you
 want to install the unreleased version of *PyDolphinScheduler*, you could go and see
-section `installing PyDolphinScheduler in dev branch`_ for more detail.
+section `installing PyDolphinScheduler in dev branch`_ for more details.
 
 .. note::
 
-   Currently, we released multiple pre-release package in PyPI, you can see all released package
+   Currently, we have released multiple pre-release packages in PyPI, you can see all released packages
    including pre-release in `release history <https://pypi.org/project/apache-dolphinscheduler/#history>`_.
    You can fix the the package version if you want to install pre-release package, for example if
    you want to install version `3.0.0-beta-2` package, you can run command
@@ -72,9 +72,9 @@
 Installing PyDolphinScheduler In DEV Branch
 -------------------------------------------
 
-Because the project is developing and some of the features still not release.
-If you want to try some thing unreleased you could install from the source code
-which we hold in GitHub
+Because the project is developing and some of the features are still not released.
+If you want to try something unreleased you could install from the source code
+which we hold on GitHub
 
 .. code-block:: bash
 
@@ -84,10 +84,10 @@
    python -m pip install -e .
 
 After you installed *PyDolphinScheduler*, please remember `start Python Gateway Service`_
-which waiting for *PyDolphinScheduler*'s workflow definition require.
+which is required for *PyDolphinScheduler*'s workflow definition.
 
-Above command will clone whole dolphinscheduler source code to local, maybe you want to install latest pydolphinscheduler
-package directly and do not care about other code(including Python gateway service code), you can execute command
+Above command will clone whole dolphinscheduler source code to local, maybe you want to install the latest pydolphinscheduler
+package directly and do not care about other code(including Python gateway service code), you can execute the command
 
 .. code-block:: bash
 
@@ -98,10 +98,10 @@
 ----------------------------
 
 Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
-could define workflow and tasks structure, but could not run it unless you
-`install Apache DolphinScheduler`_ and start its API server which including
-Python gateway service in it. We only and some key steps here and you could
-go `install Apache DolphinScheduler`_ for more detail
+could define workflow and task structures, but could not run it unless you
+`install Apache DolphinScheduler`_ and start its API server which includes
+Python gateway service in it. We only write some key steps here and you could
+go `install Apache DolphinScheduler`_ for more details
 
 .. code-block:: bash
 
@@ -109,7 +109,7 @@
    ./bin/dolphinscheduler-daemon.sh start api-server
 
 To check whether the server is alive or not, you could run :code:`jps`. And
-the server is health if keyword `ApiApplicationServer` in the console.
+the server is healthy if keyword `ApiApplicationServer` is in the console.
 
 .. code-block:: bash
 
@@ -120,14 +120,14 @@
 
 .. note::
 
-   Please make sure you already enabled started Python gateway service along with `api-server`. The configuration is in
+   Please make sure you already started Python gateway service along with `api-server`. The configuration is in
    yaml config path `python-gateway.enabled : true` in api-server's configuration path in `api-server/conf/application.yaml`.
-   The default value is true and Python gateway service start when api server is been started.
+   The default value is true and Python gateway service starts when api server is started.
 
 Run an Example
 --------------
 
-Before run an example for pydolphinscheduler, you should get the example code from it source code. You could run
+Before run an example for pydolphinscheduler, you should get the example code from its source code. You could run
 single bash command to get it
 
 .. code-block:: bash
@@ -156,19 +156,19 @@
 
 After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
 and the path of web UI is `Project -> Workflow -> Workflow Definition`, and you can see a workflow and workflow instance
-had been created and DAG is auto formatter by web UI.
+had been created and DAG is automatically formatted by web UI.
 
 .. note::
 
-   We have default authentication token when in first launch dolphinscheduler and pydolphinscheduler. Please change
+   We have default authentication token when you first launch dolphinscheduler and pydolphinscheduler. Please change
    the parameter ``auth_token`` when you deploy in production environment or test dolphinscheduler in public network.
-   See :ref:`authentication token <concept:authentication token>` for more detail.
+   See :ref:`authentication token <concept:authentication token>` for more details.
 
 
 What's More
 -----------
 
-If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
+If you are not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
 if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
 :doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
 
diff --git a/docs/source/tasks/datax.rst b/docs/source/tasks/datax.rst
index cb67a2f..f704e3f 100644
--- a/docs/source/tasks/datax.rst
+++ b/docs/source/tasks/datax.rst
@@ -27,6 +27,16 @@
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_datax_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+
 Dive Into
 ---------
 
diff --git a/docs/source/tasks/python.rst b/docs/source/tasks/python.rst
index 1bf6210..40b0c6d 100644
--- a/docs/source/tasks/python.rst
+++ b/docs/source/tasks/python.rst
@@ -18,6 +18,27 @@
 Python
 ======
 
+A Python task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_python_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_python_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+Dive Into
+---------
+
 .. automodule:: pydolphinscheduler.tasks.python
 
 
diff --git a/docs/source/tasks/pytorch.rst b/docs/source/tasks/pytorch.rst
index 4c7a552..7e3c034 100644
--- a/docs/source/tasks/pytorch.rst
+++ b/docs/source/tasks/pytorch.rst
@@ -28,6 +28,17 @@
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
+
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
+
 Dive Into
 ---------
 
diff --git a/docs/source/tasks/shell.rst b/docs/source/tasks/shell.rst
index 2dd106a..13074fc 100644
--- a/docs/source/tasks/shell.rst
+++ b/docs/source/tasks/shell.rst
@@ -27,6 +27,15 @@
    :start-after: [start workflow_declare]
    :end-before: [end task_relation_declare]
 
+Resource Limit Example
+----------------------
+
+We can add resource limit like CPU quota and max memory by passing parameters when declaring tasks.
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start resource_limit]
+   :end-before: [end resource_limit]
+
 Dive Into
 ---------
 
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 695c945..259e3e7 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -20,21 +20,21 @@
 
 This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
 things you should know before you submit or run your first workflow. If you
-still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
+still have not installed *PyDolphinScheduler* and started DolphinScheduler, you
+could go and see :ref:`how to get started with PyDolphinScheduler <start:getting started>` firstly.
 
 Overview of Tutorial
 --------------------
 
-Here have an overview of our tutorial, and it looks a little complex but does not
-worry about that because we explain this example below as detail as possible.
+Here, we have an overview of our tutorial, and it looks a little complex but don't
+worry about that because we will explain this example below in as much detail as possible.
 
 There are two types of tutorials: traditional and task decorator.
 
 - **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
   when you build your workflow at the beginning.
-- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
-  versatility to the traditional way because it only supported Python functions and without build-in tasks
+- **Task Decorator**: A Python decorator that allows you to wrap your function into pydolphinscheduler's task. Less
+  versatility to the traditional way because it only supports Python functions without build-in tasks
   supported. But it is helpful if your workflow is all built with Python or if you already have some Python
   workflow code and want to migrate them to pydolphinscheduler.
 - **YAML File**: We can use pydolphinscheduler CLI to create workflow using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
@@ -94,7 +94,7 @@
 import them from `import necessary module`_. Here we declare basic arguments for workflow.
 We define the name of :code:`Workflow`, using `Python context manager`_ and it **the only required argument**
 for `Workflow`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
-which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
+which sets workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
 will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
 *PyDolphinScheduler* :doc:`concept` for more information.
 
@@ -119,7 +119,7 @@
       :end-before: # Define the tasks within the workflow
       :language: yaml
 
-We could find more detail about :code:`Workflow` in :ref:`concept about workflow <concept:workflow>`
+We could find more details about :code:`Workflow` in :ref:`concept about workflow <concept:workflow>`
 if you are interested in it. For all arguments of object workflow, you could find in the
 :class:`pydolphinscheduler.core.workflow` API documentation.
 
@@ -128,7 +128,7 @@
 
 .. tab:: Tradition
 
-   We declare four tasks to show how to create tasks, and both of them are simple tasks of
+   We declare four tasks to show how to create tasks, and all of them are simple tasks of
    :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
    `command` with :code:`echo` command, we also need to set the argument `name` for each task
    *(not only shell task, `name` is required for each type of task)*.
@@ -142,7 +142,7 @@
 
 .. tab:: Task Decorator
 
-   We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
+   We declare four tasks to show how to create tasks, and all of them are created by the task decorator which
    using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
    :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.workflow`
 
@@ -151,7 +151,7 @@
       :start-after: [start task_declare]
       :end-before: [end task_declare]
 
-   It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
+   It makes our workflow more Pythonic, but be careful that when we use task decorator mode, it means we only use
    Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
 
 .. tab:: YAML File
diff --git a/setup.cfg b/setup.cfg
index f8f28d7..cca6044 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -182,7 +182,9 @@
     D105,
     # Conflict to Black
     # W503: Line breaks before binary operators
-    W503
+    W503,
+    # D400: First line should end with a period
+    D400
 per-file-ignores =
     */pydolphinscheduler/side/__init__.py:F401
     */pydolphinscheduler/tasks/__init__.py:F401
diff --git a/src/pydolphinscheduler/core/mixin.py b/src/pydolphinscheduler/core/mixin.py
new file mode 100644
index 0000000..1cf35b3
--- /dev/null
+++ b/src/pydolphinscheduler/core/mixin.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""WorkerResource Mixin"""
+
+
+class WorkerResourceMixin:
+    """Mixin object, declare some attributes for WorkerResource."""
+
+    def add_attr(self, **kwargs):
+        """Add attributes to WorkerResource, include cpu_quota and memory_max now."""
+        self._cpu_quota = kwargs.get("cpu_quota", -1)
+        self._memory_max = kwargs.get("memory_max", -1)
+        if hasattr(self, "_DEFINE_ATTR"):
+            self._DEFINE_ATTR |= {"cpu_quota", "memory_max"}
+
+    @property
+    def cpu_quota(self):
+        """Get cpu_quota."""
+        return self._cpu_quota
+
+    @property
+    def memory_max(self):
+        """Get memory_max."""
+        return self._memory_max
diff --git a/src/pydolphinscheduler/core/parameter.py b/src/pydolphinscheduler/core/parameter.py
index 6bac357..15ea723 100644
--- a/src/pydolphinscheduler/core/parameter.py
+++ b/src/pydolphinscheduler/core/parameter.py
@@ -49,7 +49,7 @@
 
     def __eq__(self, data):
         return (
-            type(self) == type(data)
+            type(self) is type(data)
             and self.data_type == data.data_type
             and self.value == data.value
         )
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 9ec53f0..9f6b644 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -273,13 +273,13 @@
         """Get task define attribute `resource_list`."""
         resources = set()
         for res in self._resource_list:
-            if type(res) == str:
+            if isinstance(res, str):
                 resources.add(
                     Resource(
                         name=res, user_name=self.user_name
                     ).get_fullname_from_database()
                 )
-            elif type(res) == dict and ResourceKey.NAME in res:
+            elif isinstance(res, dict) and ResourceKey.NAME in res:
                 warnings.warn(
                     """`resource_list` should be defined using List[str] with resource paths,
                        the use of ids to define resources will be remove in version 3.2.0.
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py
index 6fdf779..d463ff0 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -90,5 +90,17 @@
     # You can custom json_template of datax to sync data. This task create a new
     # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
     task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
+
+    # [start resource_limit]
+    resource_limit = DataX(
+        name="resource_limit",
+        datasource_name="first_mysql",
+        datatarget_name="second_mysql",
+        sql="select id, name, code, description from source_table",
+        target_table="target_table",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
     workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_python_example.py b/src/pydolphinscheduler/examples/task_python_example.py
new file mode 100644
index 0000000..553c7a8
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_python_example.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""An example workflow for task python."""
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.python import Python
+
+with Workflow(
+    name="task_python_example",
+) as workflow:
+    task_python = Python(
+        name="task",
+        definition="print('hello world.')",
+    )
+
+    # [start resource_limit]
+    python_resources_limit = Python(
+        name="python_resources_limit",
+        definition="print('hello world.')",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
+
+    task_python >> python_resources_limit
+    workflow.submit()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 8e431d5..a587f53 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -56,5 +56,17 @@
         requirements="requirements.txt",
     )
 
+    # [start resource_limit]
+    pytorch_resources_limit = Pytorch(
+        name="pytorch_resources_limit",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        python_command="/home/anaconda3/envs/pytorch/bin/python3",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
+
     workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py
index 74080b8..9519d8e 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -52,13 +52,22 @@
     task_child_one = Shell(name="task_child_one", command="echo 'child one'")
     task_child_two = Shell(name="task_child_two", command="echo 'child two'")
     task_union = Shell(name="task_union", command="echo union")
+
+    # [start resource_limit]
+    resource_limit = Shell(
+        name="resource_limit",
+        command="echo resource limit",
+        cpu_quota=1,
+        memory_max=100,
+    )
+    # [end resource_limit]
     # [end task_declare]
 
     # [start task_relation_declare]
     task_group = [task_child_one, task_child_two]
     task_parent.set_downstream(task_group)
 
-    task_union << task_group
+    resource_limit << task_union << task_group
     # [end task_relation_declare]
 
     # [start submit_or_run]
diff --git a/src/pydolphinscheduler/models/base.py b/src/pydolphinscheduler/models/base.py
index 2647714..007edec 100644
--- a/src/pydolphinscheduler/models/base.py
+++ b/src/pydolphinscheduler/models/base.py
@@ -43,7 +43,7 @@
         return f'<{type(self).__name__}: name="{self.name}">'
 
     def __eq__(self, other):
-        return type(self) == type(other) and all(
+        return type(self) is type(other) and all(
             getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR
         )
 
diff --git a/src/pydolphinscheduler/tasks/datax.py b/src/pydolphinscheduler/tasks/datax.py
index 1dfa89c..47b8b1f 100644
--- a/src/pydolphinscheduler/tasks/datax.py
+++ b/src/pydolphinscheduler/tasks/datax.py
@@ -20,11 +20,12 @@
 from typing import Dict, List, Optional
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.models.datasource import Datasource
 
 
-class CustomDataX(Task):
+class CustomDataX(WorkerResourceMixin, Task):
     """Task CustomDatax object, declare behavior for custom DataX task to dolphinscheduler.
 
     You provider json template for DataX, it can synchronize data according to the template you provided.
@@ -51,9 +52,10 @@
         self.custom_config = self.CUSTOM_CONFIG
         self.xms = xms
         self.xmx = xmx
+        self.add_attr(**kwargs)
 
 
-class DataX(Task):
+class DataX(WorkerResourceMixin, Task):
     """Task DataX object, declare behavior for DataX task to dolphinscheduler.
 
     It should run database datax job in multiply sql link engine, such as:
@@ -123,6 +125,7 @@
         self.post_statements = post_statements or []
         self.xms = xms
         self.xmx = xmx
+        self.add_attr(**kwargs)
 
     @property
     def source_params(self) -> Dict:
diff --git a/src/pydolphinscheduler/tasks/python.py b/src/pydolphinscheduler/tasks/python.py
index c1b2558..93cec00 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -26,13 +26,14 @@
 from stmdency.extractor import Extractor
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
 log = logging.getLogger(__file__)
 
 
-class Python(Task):
+class Python(WorkerResourceMixin, Task):
     """Task Python object, declare behavior for Python task to dolphinscheduler.
 
     Python task support two types of parameters for :param:``definition``, and here is an example:
@@ -67,6 +68,7 @@
     ):
         self._definition = definition
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
+        self.add_attr(**kwargs)
 
     def _build_exe_str(self) -> str:
         """Build executable string from given definition.
diff --git a/src/pydolphinscheduler/tasks/pytorch.py b/src/pydolphinscheduler/tasks/pytorch.py
index 4767f7e..8ae371f 100644
--- a/src/pydolphinscheduler/tasks/pytorch.py
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -19,6 +19,7 @@
 from typing import Optional
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 
 
@@ -30,7 +31,7 @@
     python_command = "${PYTHON_HOME}"
 
 
-class Pytorch(Task):
+class Pytorch(WorkerResourceMixin, Task):
     """Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
 
     See also: `DolphinScheduler Pytorch Task Plugin
@@ -83,6 +84,7 @@
         self.python_env_tool = python_env_tool
         self.requirements = requirements
         self.conda_python_version = conda_python_version
+        self.add_attr(**kwargs)
 
     @property
     def other_params(self):
diff --git a/src/pydolphinscheduler/tasks/shell.py b/src/pydolphinscheduler/tasks/shell.py
index 36ec4e8..f6795ca 100644
--- a/src/pydolphinscheduler/tasks/shell.py
+++ b/src/pydolphinscheduler/tasks/shell.py
@@ -18,10 +18,11 @@
 """Task shell."""
 
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.mixin import WorkerResourceMixin
 from pydolphinscheduler.core.task import Task
 
 
-class Shell(Task):
+class Shell(WorkerResourceMixin, Task):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
     :param name: A unique, meaningful string for the shell task.
@@ -56,3 +57,4 @@
     def __init__(self, name: str, command: str, *args, **kwargs):
         self._raw_script = command
         super().__init__(name, TaskType.SHELL, *args, **kwargs)
+        self.add_attr(**kwargs)
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index b174afa..aa884a6 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -173,3 +173,78 @@
     """Test task CustomDataX json content through the local resource plug-in."""
     custom_datax = CustomDataX(**attr)
     assert expect == getattr(custom_datax, "json")
+
+
+@pytest.mark.parametrize(
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+@patch.object(Datasource, "get_task_usage_4j", return_value=TaskUsage(1, "MYSQL"))
+def test_datax_get_define_cpu_and_memory(mock_datasource, resource_limit):
+    """Test task datax function get_define with resource limit."""
+    code = 123
+    version = 1
+    name = "test_datax_get_define_cpu_and_memory"
+    command = "select name from test_source_table_name_resource_limit"
+    datasource_name = "test_datasource_resource_limit"
+    datatarget_name = "test_datatarget_resource_limit"
+    target_table = "test_target_table_name_resource_limit"
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        datax = DataX(
+            name,
+            datasource_name,
+            datatarget_name,
+            command,
+            target_table,
+            **resource_limit
+        )
+        assert "cpuQuota" in datax.get_define()
+        assert "memoryMax" in datax.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert datax.get_define()["cpuQuota"] == resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert datax.get_define()["memoryMax"] == resource_limit.get("memory_max")
+
+
+@pytest.mark.parametrize(
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_custom_datax_get_define_cpu_and_memory(resource_limit):
+    """Test custom datax shell function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        custom_datax = CustomDataX(
+            "test_custom_datax_get_define", "json_template", **resource_limit
+        )
+        assert "cpuQuota" in custom_datax.get_define()
+        assert "memoryMax" in custom_datax.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert custom_datax.get_define()["cpuQuota"] == resource_limit.get(
+                "cpu_quota"
+            )
+
+        if "memoryMax" in resource_limit:
+            assert custom_datax.get_define()["memoryMax"] == resource_limit.get(
+                "memory_max"
+            )
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index 516513a..e842f90 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -178,3 +178,33 @@
     """Test task Python definition content through the local resource plug-in."""
     python = Python(**attr)
     assert expect == getattr(python, "definition")
+
+
+@pytest.mark.parametrize(
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_python_get_define_cpu_and_memory(resource_limit):
+    """Test task python function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        python = Python(
+            name="task", definition="print('hello world.')", **resource_limit
+        )
+        assert "cpuQuota" in python.get_define()
+        assert "memoryMax" in python.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert python.get_define()["cpuQuota"] == resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert python.get_define()["memoryMax"] == resource_limit.get("memory_max")
diff --git a/tests/tasks/test_pytorch.py b/tests/tasks/test_pytorch.py
index c04723e..66c1664 100644
--- a/tests/tasks/test_pytorch.py
+++ b/tests/tasks/test_pytorch.py
@@ -122,3 +122,31 @@
             python_command=python_command,
         )
         assert task.other_params == expect
+
+
+@pytest.mark.parametrize(
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_pytorch_get_define_cpu_and_memory(resource_limit):
+    """Test task pytorch function get_define with resource limit."""
+    code = 123
+    version = 1
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        pytorch = Pytorch(name="test", script="", script_params="", **resource_limit)
+        assert "cpuQuota" in pytorch.get_define()
+        assert "memoryMax" in pytorch.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert pytorch.get_define()["cpuQuota"] == resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert pytorch.get_define()["memoryMax"] == resource_limit.get("memory_max")
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py
index 3932c08..e768a2d 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_shell.py
@@ -112,3 +112,33 @@
     """Test task shell task command content through the local resource plug-in."""
     task = Shell(**attr)
     assert expect == getattr(task, "raw_script")
+
+
+@pytest.mark.parametrize(
+    "resource_limit",
+    [
+        {"cpu_quota": 1, "memory_max": 10},
+        {"memory_max": 15},
+        {},
+    ],
+)
+def test_shell_get_define_cpu_and_memory(resource_limit):
+    """Test task shell function get_define with resource limit."""
+    code = 123
+    version = 1
+    name = "test_shell_get_define_cpu_and_memory"
+    command = "echo test shell with resource limit"
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        shell = Shell(name, command, **resource_limit)
+        assert "cpuQuota" in shell.get_define()
+        assert "memoryMax" in shell.get_define()
+
+        if "cpuQuota" in resource_limit:
+            assert shell.get_define()["cpuQuota"] == resource_limit.get("cpu_quota")
+
+        if "memoryMax" in resource_limit:
+            assert shell.get_define()["memoryMax"] == resource_limit.get("memory_max")
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 998f711..07a3249 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -24,7 +24,6 @@
 task_without_example = {
     "http",
     "sub_workflow",
-    "python",
     "procedure",
 }