blob: d15ace5547f0131b4f2b63c8da4f7d86ef516188 [file] [log] [blame]
<!doctype html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
<title>pulsar API documentation</title>
<meta name="description" content="The Pulsar Python client library is based on the existing C++ client library.
All the same features ..." />
<link href='http://fonts.googleapis.com/css?family=Source+Sans+Pro:400,300' rel='stylesheet' type='text/css'>
<style type="text/css">
* {
box-sizing: border-box;
}
/*! normalize.css v1.1.1 | MIT License | git.io/normalize */
/* ==========================================================================
HTML5 display definitions
========================================================================== */
/**
* Correct `block` display not defined in IE 6/7/8/9 and Firefox 3.
*/
article,
aside,
details,
figcaption,
figure,
footer,
header,
hgroup,
main,
nav,
section,
summary {
display: block;
}
/**
* Correct `inline-block` display not defined in IE 6/7/8/9 and Firefox 3.
*/
audio,
canvas,
video {
display: inline-block;
*display: inline;
*zoom: 1;
}
/**
* Prevent modern browsers from displaying `audio` without controls.
* Remove excess height in iOS 5 devices.
*/
audio:not([controls]) {
display: none;
height: 0;
}
/**
* Address styling not present in IE 7/8/9, Firefox 3, and Safari 4.
* Known issue: no IE 6 support.
*/
[hidden] {
display: none;
}
/* ==========================================================================
Base
========================================================================== */
/**
* 1. Prevent system color scheme's background color being used in Firefox, IE,
* and Opera.
* 2. Prevent system color scheme's text color being used in Firefox, IE, and
* Opera.
* 3. Correct text resizing oddly in IE 6/7 when body `font-size` is set using
* `em` units.
* 4. Prevent iOS text size adjust after orientation change, without disabling
* user zoom.
*/
html {
background: #fff; /* 1 */
color: #000; /* 2 */
font-size: 100%; /* 3 */
-webkit-text-size-adjust: 100%; /* 4 */
-ms-text-size-adjust: 100%; /* 4 */
}
/**
* Address `font-family` inconsistency between `textarea` and other form
* elements.
*/
html,
button,
input,
select,
textarea {
font-family: sans-serif;
}
/**
* Address margins handled incorrectly in IE 6/7.
*/
body {
margin: 0;
}
/* ==========================================================================
Links
========================================================================== */
/**
* Address `outline` inconsistency between Chrome and other browsers.
*/
a:focus {
outline: thin dotted;
}
/**
* Improve readability when focused and also mouse hovered in all browsers.
*/
a:active,
a:hover {
outline: 0;
}
/* ==========================================================================
Typography
========================================================================== */
/**
* Address font sizes and margins set differently in IE 6/7.
* Address font sizes within `section` and `article` in Firefox 4+, Safari 5,
* and Chrome.
*/
h1 {
font-size: 2em;
margin: 0.67em 0;
}
h2 {
font-size: 1.5em;
margin: 0.83em 0;
}
h3 {
font-size: 1.17em;
margin: 1em 0;
}
h4 {
font-size: 1em;
margin: 1.33em 0;
}
h5 {
font-size: 0.83em;
margin: 1.67em 0;
}
h6 {
font-size: 0.67em;
margin: 2.33em 0;
}
/**
* Address styling not present in IE 7/8/9, Safari 5, and Chrome.
*/
abbr[title] {
border-bottom: 1px dotted;
}
/**
* Address style set to `bolder` in Firefox 3+, Safari 4/5, and Chrome.
*/
b,
strong {
font-weight: bold;
}
blockquote {
margin: 1em 40px;
}
/**
* Address styling not present in Safari 5 and Chrome.
*/
dfn {
font-style: italic;
}
/**
* Address differences between Firefox and other browsers.
* Known issue: no IE 6/7 normalization.
*/
hr {
-moz-box-sizing: content-box;
box-sizing: content-box;
height: 0;
}
/**
* Address styling not present in IE 6/7/8/9.
*/
mark {
background: #ff0;
color: #000;
}
/**
* Address margins set differently in IE 6/7.
*/
p,
pre {
margin: 1em 0;
}
/**
* Correct font family set oddly in IE 6, Safari 4/5, and Chrome.
*/
code,
kbd,
pre,
samp {
font-family: monospace, serif;
_font-family: 'courier new', monospace;
font-size: 1em;
}
/**
* Improve readability of pre-formatted text in all browsers.
*/
pre {
white-space: pre;
white-space: pre-wrap;
word-wrap: break-word;
}
/**
* Address CSS quotes not supported in IE 6/7.
*/
q {
quotes: none;
}
/**
* Address `quotes` property not supported in Safari 4.
*/
q:before,
q:after {
content: '';
content: none;
}
/**
* Address inconsistent and variable font size in all browsers.
*/
small {
font-size: 80%;
}
/**
* Prevent `sub` and `sup` affecting `line-height` in all browsers.
*/
sub,
sup {
font-size: 75%;
line-height: 0;
position: relative;
vertical-align: baseline;
}
sup {
top: -0.5em;
}
sub {
bottom: -0.25em;
}
/* ==========================================================================
Lists
========================================================================== */
/**
* Address margins set differently in IE 6/7.
*/
dl,
menu,
ol,
ul {
margin: 1em 0;
}
dd {
margin: 0 0 0 40px;
}
/**
* Address paddings set differently in IE 6/7.
*/
menu,
ol,
ul {
padding: 0 0 0 40px;
}
/**
* Correct list images handled incorrectly in IE 7.
*/
nav ul,
nav ol {
list-style: none;
list-style-image: none;
}
/* ==========================================================================
Embedded content
========================================================================== */
/**
* 1. Remove border when inside `a` element in IE 6/7/8/9 and Firefox 3.
* 2. Improve image quality when scaled in IE 7.
*/
img {
border: 0; /* 1 */
-ms-interpolation-mode: bicubic; /* 2 */
}
/**
* Correct overflow displayed oddly in IE 9.
*/
svg:not(:root) {
overflow: hidden;
}
/* ==========================================================================
Figures
========================================================================== */
/**
* Address margin not present in IE 6/7/8/9, Safari 5, and Opera 11.
*/
figure {
margin: 0;
}
/* ==========================================================================
Forms
========================================================================== */
/**
* Correct margin displayed oddly in IE 6/7.
*/
form {
margin: 0;
}
/**
* Define consistent border, margin, and padding.
*/
fieldset {
border: 1px solid #c0c0c0;
margin: 0 2px;
padding: 0.35em 0.625em 0.75em;
}
/**
* 1. Correct color not being inherited in IE 6/7/8/9.
* 2. Correct text not wrapping in Firefox 3.
* 3. Correct alignment displayed oddly in IE 6/7.
*/
legend {
border: 0; /* 1 */
padding: 0;
white-space: normal; /* 2 */
*margin-left: -7px; /* 3 */
}
/**
* 1. Correct font size not being inherited in all browsers.
* 2. Address margins set differently in IE 6/7, Firefox 3+, Safari 5,
* and Chrome.
* 3. Improve appearance and consistency in all browsers.
*/
button,
input,
select,
textarea {
font-size: 100%; /* 1 */
margin: 0; /* 2 */
vertical-align: baseline; /* 3 */
*vertical-align: middle; /* 3 */
}
/**
* Address Firefox 3+ setting `line-height` on `input` using `!important` in
* the UA stylesheet.
*/
button,
input {
line-height: normal;
}
/**
* Address inconsistent `text-transform` inheritance for `button` and `select`.
* All other form control elements do not inherit `text-transform` values.
* Correct `button` style inheritance in Chrome, Safari 5+, and IE 6+.
* Correct `select` style inheritance in Firefox 4+ and Opera.
*/
button,
select {
text-transform: none;
}
/**
* 1. Avoid the WebKit bug in Android 4.0.* where (2) destroys native `audio`
* and `video` controls.
* 2. Correct inability to style clickable `input` types in iOS.
* 3. Improve usability and consistency of cursor style between image-type
* `input` and others.
* 4. Remove inner spacing in IE 7 without affecting normal text inputs.
* Known issue: inner spacing remains in IE 6.
*/
button,
html input[type="button"], /* 1 */
input[type="reset"],
input[type="submit"] {
-webkit-appearance: button; /* 2 */
cursor: pointer; /* 3 */
*overflow: visible; /* 4 */
}
/**
* Re-set default cursor for disabled elements.
*/
button[disabled],
html input[disabled] {
cursor: default;
}
/**
* 1. Address box sizing set to content-box in IE 8/9.
* 2. Remove excess padding in IE 8/9.
* 3. Remove excess padding in IE 7.
* Known issue: excess padding remains in IE 6.
*/
input[type="checkbox"],
input[type="radio"] {
box-sizing: border-box; /* 1 */
padding: 0; /* 2 */
*height: 13px; /* 3 */
*width: 13px; /* 3 */
}
/**
* 1. Address `appearance` set to `searchfield` in Safari 5 and Chrome.
* 2. Address `box-sizing` set to `border-box` in Safari 5 and Chrome
* (include `-moz` to future-proof).
*/
input[type="search"] {
-webkit-appearance: textfield; /* 1 */
-moz-box-sizing: content-box;
-webkit-box-sizing: content-box; /* 2 */
box-sizing: content-box;
}
/**
* Remove inner padding and search cancel button in Safari 5 and Chrome
* on OS X.
*/
input[type="search"]::-webkit-search-cancel-button,
input[type="search"]::-webkit-search-decoration {
-webkit-appearance: none;
}
/**
* Remove inner padding and border in Firefox 3+.
*/
button::-moz-focus-inner,
input::-moz-focus-inner {
border: 0;
padding: 0;
}
/**
* 1. Remove default vertical scrollbar in IE 6/7/8/9.
* 2. Improve readability and alignment in all browsers.
*/
textarea {
overflow: auto; /* 1 */
vertical-align: top; /* 2 */
}
/* ==========================================================================
Tables
========================================================================== */
/**
* Remove most spacing between table cells.
*/
table {
border-collapse: collapse;
border-spacing: 0;
}
</style>
<style type="text/css">
html, body {
margin: 0;
padding: 0;
min-height: 100%;
}
body {
background: #fff;
font-family: "Source Sans Pro", "Helvetica Neueue", Helvetica, sans;
font-weight: 300;
font-size: 16px;
line-height: 1.6em;
}
#content {
width: 70%;
max-width: 850px;
float: left;
padding: 30px 60px;
border-left: 1px solid #ddd;
}
#sidebar {
width: 25%;
float: left;
padding: 30px;
overflow: hidden;
}
#nav {
font-size: 130%;
margin: 0 0 15px 0;
}
#top {
display: block;
position: fixed;
bottom: 5px;
left: 5px;
font-size: .85em;
text-transform: uppercase;
}
#footer {
font-size: .75em;
padding: 5px 30px;
border-top: 1px solid #ddd;
text-align: right;
}
#footer p {
margin: 0 0 0 30px;
display: inline-block;
}
h1, h2, h3, h4, h5 {
font-weight: 300;
}
h1 {
font-size: 2.5em;
line-height: 1.1em;
margin: 0 0 .50em 0;
}
h2 {
font-size: 1.75em;
margin: 1em 0 .50em 0;
}
h3 {
margin: 25px 0 10px 0;
}
h4 {
margin: 0;
font-size: 105%;
}
a {
color: #058;
text-decoration: none;
transition: color .3s ease-in-out;
}
a:hover {
color: #e08524;
transition: color .3s ease-in-out;
}
pre, code, .mono, .name {
font-family: "Ubuntu Mono", "Cousine", "DejaVu Sans Mono", monospace;
}
.title .name {
font-weight: bold;
}
.section-title {
margin-top: 2em;
}
.ident {
color: #900;
}
code {
background: #f9f9f9;
}
pre {
background: #fefefe;
border: 1px solid #ddd;
box-shadow: 2px 2px 0 #f3f3f3;
margin: 0 30px;
padding: 15px 30px;
}
.codehilite {
margin: 0 30px 10px 30px;
}
.codehilite pre {
margin: 0;
}
.codehilite .err { background: #ff3300; color: #fff !important; }
table#module-list {
font-size: 110%;
}
table#module-list tr td:first-child {
padding-right: 10px;
white-space: nowrap;
}
table#module-list td {
vertical-align: top;
padding-bottom: 8px;
}
table#module-list td p {
margin: 0 0 7px 0;
}
.def {
display: table;
}
.def p {
display: table-cell;
vertical-align: top;
text-align: left;
}
.def p:first-child {
white-space: nowrap;
}
.def p:last-child {
width: 100%;
}
#index {
list-style-type: none;
margin: 0;
padding: 0;
}
ul#index .class_name {
/* font-size: 110%; */
font-weight: bold;
}
#index ul {
margin: 0;
}
.item {
margin: 0 0 15px 0;
}
.item .class {
margin: 0 0 25px 30px;
}
.item .class ul.class_list {
margin: 0 0 20px 0;
}
.item .name {
background: #fafafa;
margin: 0;
font-weight: bold;
padding: 5px 10px;
border-radius: 3px;
display: inline-block;
min-width: 40%;
}
.item .name:hover {
background: #f6f6f6;
}
.item .empty_desc {
margin: 0 0 5px 0;
padding: 0;
}
.item .inheritance {
margin: 3px 0 0 30px;
}
.item .inherited {
color: #666;
}
.item .desc {
padding: 0 8px;
margin: 0;
}
.item .desc p {
margin: 0 0 10px 0;
}
.source_cont {
margin: 0;
padding: 0;
}
.source_link a {
background: #ffc300;
font-weight: 400;
font-size: .75em;
text-transform: uppercase;
color: #fff;
text-shadow: 1px 1px 0 #f4b700;
padding: 3px 8px;
border-radius: 2px;
transition: background .3s ease-in-out;
}
.source_link a:hover {
background: #FF7200;
text-shadow: none;
transition: background .3s ease-in-out;
}
.source {
display: none;
max-height: 600px;
overflow-y: scroll;
margin-bottom: 15px;
}
.source .codehilite {
margin: 0;
}
.desc h1, .desc h2, .desc h3 {
font-size: 100% !important;
}
.clear {
clear: both;
}
@media all and (max-width: 950px) {
#sidebar {
width: 35%;
}
#content {
width: 65%;
}
}
@media all and (max-width: 650px) {
#top {
display: none;
}
#sidebar {
float: none;
width: auto;
}
#content {
float: none;
width: auto;
padding: 30px;
}
#index ul {
padding: 0;
margin-bottom: 15px;
}
#index ul li {
display: inline-block;
margin-right: 30px;
}
#footer {
text-align: left;
}
#footer p {
display: block;
margin: inherit;
}
}
/*****************************/
</style>
<style type="text/css">
/* ==========================================================================
EXAMPLE Media Queries for Responsive Design.
These examples override the primary ('mobile first') styles.
Modify as content requires.
========================================================================== */
@media only screen and (min-width: 35em) {
/* Style adjustments for viewports that meet the condition */
}
@media print,
(-o-min-device-pixel-ratio: 5/4),
(-webkit-min-device-pixel-ratio: 1.25),
(min-resolution: 120dpi) {
/* Style adjustments for high resolution devices */
}
/* ==========================================================================
Print styles.
Inlined to avoid required HTTP connection: h5bp.com/r
========================================================================== */
@media print {
* {
background: transparent !important;
color: #000 !important; /* Black prints faster: h5bp.com/s */
box-shadow: none !important;
text-shadow: none !important;
}
a,
a:visited {
text-decoration: underline;
}
a[href]:after {
content: " (" attr(href) ")";
}
abbr[title]:after {
content: " (" attr(title) ")";
}
/*
* Don't show links for images, or javascript/internal links
*/
.ir a:after,
a[href^="javascript:"]:after,
a[href^="#"]:after {
content: "";
}
pre,
blockquote {
border: 1px solid #999;
page-break-inside: avoid;
}
thead {
display: table-header-group; /* h5bp.com/t */
}
tr,
img {
page-break-inside: avoid;
}
img {
max-width: 100% !important;
}
@page {
margin: 0.5cm;
}
p,
h2,
h3 {
orphans: 3;
widows: 3;
}
h2,
h3 {
page-break-after: avoid;
}
}
</style>
<script type="text/javascript">
function toggle(id, $link) {
$node = document.getElementById(id);
if (!$node)
return;
if (!$node.style.display || $node.style.display == 'none') {
$node.style.display = 'block';
$link.innerHTML = 'Hide source &nequiv;';
} else {
$node.style.display = 'none';
$link.innerHTML = 'Show source &equiv;';
}
}
</script>
</head>
<body>
<a href="#" id="top">Top</a>
<div id="container">
<div id="sidebar">
<h1>Index</h1>
<ul id="index">
<li class="set"><h3><a href="#header-classes">Classes</a></h3>
<ul>
<li class="mono">
<span class="class_name"><a href="#pulsar.Authentication">Authentication</a></span>
<ul>
<li class="mono"><a href="#pulsar.Authentication.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></span>
<ul>
<li class="mono"><a href="#pulsar.AuthenticationAthenz.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></span>
<ul>
<li class="mono"><a href="#pulsar.AuthenticationOauth2.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></span>
<ul>
<li class="mono"><a href="#pulsar.AuthenticationTLS.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></span>
<ul>
<li class="mono"><a href="#pulsar.AuthenticationToken.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.Client">Client</a></span>
<ul>
<li class="mono"><a href="#pulsar.Client.__init__">__init__</a></li>
<li class="mono"><a href="#pulsar.Client.close">close</a></li>
<li class="mono"><a href="#pulsar.Client.create_producer">create_producer</a></li>
<li class="mono"><a href="#pulsar.Client.create_reader">create_reader</a></li>
<li class="mono"><a href="#pulsar.Client.get_topic_partitions">get_topic_partitions</a></li>
<li class="mono"><a href="#pulsar.Client.shutdown">shutdown</a></li>
<li class="mono"><a href="#pulsar.Client.subscribe">subscribe</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.Consumer">Consumer</a></span>
<ul>
<li class="mono"><a href="#pulsar.Consumer.acknowledge">acknowledge</a></li>
<li class="mono"><a href="#pulsar.Consumer.acknowledge_cumulative">acknowledge_cumulative</a></li>
<li class="mono"><a href="#pulsar.Consumer.close">close</a></li>
<li class="mono"><a href="#pulsar.Consumer.is_connected">is_connected</a></li>
<li class="mono"><a href="#pulsar.Consumer.negative_acknowledge">negative_acknowledge</a></li>
<li class="mono"><a href="#pulsar.Consumer.pause_message_listener">pause_message_listener</a></li>
<li class="mono"><a href="#pulsar.Consumer.receive">receive</a></li>
<li class="mono"><a href="#pulsar.Consumer.redeliver_unacknowledged_messages">redeliver_unacknowledged_messages</a></li>
<li class="mono"><a href="#pulsar.Consumer.resume_message_listener">resume_message_listener</a></li>
<li class="mono"><a href="#pulsar.Consumer.seek">seek</a></li>
<li class="mono"><a href="#pulsar.Consumer.subscription_name">subscription_name</a></li>
<li class="mono"><a href="#pulsar.Consumer.topic">topic</a></li>
<li class="mono"><a href="#pulsar.Consumer.unsubscribe">unsubscribe</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></span>
<ul>
<li class="mono"><a href="#pulsar.CryptoKeyReader.__init__">__init__</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.Message">Message</a></span>
<ul>
<li class="mono"><a href="#pulsar.Message.data">data</a></li>
<li class="mono"><a href="#pulsar.Message.event_timestamp">event_timestamp</a></li>
<li class="mono"><a href="#pulsar.Message.message_id">message_id</a></li>
<li class="mono"><a href="#pulsar.Message.partition_key">partition_key</a></li>
<li class="mono"><a href="#pulsar.Message.properties">properties</a></li>
<li class="mono"><a href="#pulsar.Message.publish_timestamp">publish_timestamp</a></li>
<li class="mono"><a href="#pulsar.Message.redelivery_count">redelivery_count</a></li>
<li class="mono"><a href="#pulsar.Message.schema_version">schema_version</a></li>
<li class="mono"><a href="#pulsar.Message.topic_name">topic_name</a></li>
<li class="mono"><a href="#pulsar.Message.value">value</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.MessageBatch">MessageBatch</a></span>
<ul>
<li class="mono"><a href="#pulsar.MessageBatch.__init__">__init__</a></li>
<li class="mono"><a href="#pulsar.MessageBatch.parse_from">parse_from</a></li>
<li class="mono"><a href="#pulsar.MessageBatch.with_message_id">with_message_id</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.MessageId">MessageId</a></span>
<ul>
<li class="mono"><a href="#pulsar.MessageId.deserialize">deserialize</a></li>
<li class="mono"><a href="#pulsar.MessageId.__init__">__init__</a></li>
<li class="mono"><a href="#pulsar.MessageId.batch_index">batch_index</a></li>
<li class="mono"><a href="#pulsar.MessageId.entry_id">entry_id</a></li>
<li class="mono"><a href="#pulsar.MessageId.ledger_id">ledger_id</a></li>
<li class="mono"><a href="#pulsar.MessageId.partition">partition</a></li>
<li class="mono"><a href="#pulsar.MessageId.serialize">serialize</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.Producer">Producer</a></span>
<ul>
<li class="mono"><a href="#pulsar.Producer.close">close</a></li>
<li class="mono"><a href="#pulsar.Producer.flush">flush</a></li>
<li class="mono"><a href="#pulsar.Producer.is_connected">is_connected</a></li>
<li class="mono"><a href="#pulsar.Producer.last_sequence_id">last_sequence_id</a></li>
<li class="mono"><a href="#pulsar.Producer.producer_name">producer_name</a></li>
<li class="mono"><a href="#pulsar.Producer.send">send</a></li>
<li class="mono"><a href="#pulsar.Producer.send_async">send_async</a></li>
<li class="mono"><a href="#pulsar.Producer.topic">topic</a></li>
</ul>
</li>
<li class="mono">
<span class="class_name"><a href="#pulsar.Reader">Reader</a></span>
<ul>
<li class="mono"><a href="#pulsar.Reader.close">close</a></li>
<li class="mono"><a href="#pulsar.Reader.has_message_available">has_message_available</a></li>
<li class="mono"><a href="#pulsar.Reader.is_connected">is_connected</a></li>
<li class="mono"><a href="#pulsar.Reader.read_next">read_next</a></li>
<li class="mono"><a href="#pulsar.Reader.seek">seek</a></li>
<li class="mono"><a href="#pulsar.Reader.topic">topic</a></li>
</ul>
</li>
</ul>
</li>
<li class="set"><h3><a href="#header-submodules">Sub-modules</a></h3>
<ul>
<li class="mono"><a href="exceptions.m.html">pulsar.exceptions</a></li>
<li class="mono"><a href="functions/index.html">pulsar.functions</a></li>
<li class="mono"><a href="schema/index.html">pulsar.schema</a></li>
</ul>
</li>
</ul>
</div>
<article id="content">
<header id="section-intro">
<h1 class="title"><span class="name">pulsar</span> module</h1>
<p>The Pulsar Python client library is based on the existing C++ client library.
All the same features are exposed through the Python interface.</p>
<p>Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.</p>
<h2>Install from PyPI</h2>
<p>Download Python wheel binary files for MacOS and Linux
directly from the PyPI archive.</p>
<pre><code>#!shell
$ sudo pip install pulsar-client
</code></pre>
<h2>Install from sources</h2>
<p>Follow the instructions to compile the Pulsar C++ client library. This method
will also build the Python binding for the library.</p>
<p>To install the Python bindings:</p>
<pre><code>#!shell
$ cd pulsar-client-cpp/python
$ sudo python setup.py install
</code></pre>
<h2>Examples</h2>
<h3><a href="#pulsar.Producer">Producer</a> example</h3>
<pre><code>#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
</code></pre>
<h4><a href="#pulsar.Consumer">Consumer</a> Example</h4>
<pre><code>#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
</code></pre>
<h3><a href="#pulsar.Producer.send_async">Async producer</a> example</h3>
<pre><code>#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
)
def send_callback(res, msg_id):
print('Message published res=%s', res)
while True:
producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
client.close()
</code></pre>
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar', this);">Show source &equiv;</a></p>
<div id="source-pulsar" class="source">
<pre><code>#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
The Pulsar Python client library is based on the existing C++ client library.
All the same features are exposed through the Python interface.
Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.
## Install from PyPI
Download Python wheel binary files for MacOS and Linux
directly from the PyPI archive.
#!shell
$ sudo pip install pulsar-client
## Install from sources
Follow the instructions to compile the Pulsar C++ client library. This method
will also build the Python binding for the library.
To install the Python bindings:
#!shell
$ cd pulsar-client-cpp/python
$ sudo python setup.py install
## Examples
### [Producer](#pulsar.Producer) example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
#### [Consumer](#pulsar.Consumer) Example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)
client.close()
### [Async producer](#pulsar.Producer.send_async) example
#!python
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
'my-topic',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
)
def send_callback(res, msg_id):
print('Message published res=%s', res)
while True:
producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
client.close()
"""
import logging
import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401
from pulsar.exceptions import *
from pulsar.functions.function import Function
from pulsar.functions.context import Context
from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
from pulsar import schema
_schema = schema
import re
_retype = type(re.compile('x'))
import certifi
from datetime import timedelta
class MessageId:
"""
Represents a message id
"""
def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
'Represents the earliest message stored in a topic'
earliest = _pulsar.MessageId.earliest
'Represents the latest message published on a topic'
latest = _pulsar.MessageId.latest
def ledger_id(self):
return self._msg_id.ledger_id()
def entry_id(self):
return self._msg_id.entry_id()
def batch_index(self):
return self._msg_id.batch_index()
def partition(self):
return self._msg_id.partition()
def serialize(self):
"""
Returns a bytes representation of the message id.
This bytes sequence can be stored and later deserialized.
"""
return self._msg_id.serialize()
@staticmethod
def deserialize(message_id_bytes):
"""
Deserialize a message id object from a previously
serialized bytes sequence.
"""
return _pulsar.MessageId.deserialize(message_id_bytes)
class Message:
"""
Message objects are returned by a consumer, either by calling `receive` or
through a listener.
"""
def data(self):
"""
Returns object typed bytes with the payload of the message.
"""
return self._message.data()
def value(self):
"""
Returns object with the de-serialized version of the message content
"""
return self._schema.decode(self._message.data())
def properties(self):
"""
Return the properties attached to the message. Properties are
application-defined key/value pairs that will be attached to the
message.
"""
return self._message.properties()
def partition_key(self):
"""
Get the partitioning key for the message.
"""
return self._message.partition_key()
def publish_timestamp(self):
"""
Get the timestamp in milliseconds with the message publish time.
"""
return self._message.publish_timestamp()
def event_timestamp(self):
"""
Get the timestamp in milliseconds with the message event time.
"""
return self._message.event_timestamp()
def message_id(self):
"""
The message ID that can be used to refere to this particular message.
"""
return self._message.message_id()
def topic_name(self):
"""
Get the topic Name from which this message originated from
"""
return self._message.topic_name()
def redelivery_count(self):
"""
Get the redelivery count for this message
"""
return self._message.redelivery_count()
def schema_version(self):
"""
Get the schema version for this message
"""
return self._message.schema_version()
@staticmethod
def _wrap(_message):
self = Message()
self._message = _message
return self
class MessageBatch:
def __init__(self):
self._msg_batch = _pulsar.MessageBatch()
def with_message_id(self, msg_id):
if not isinstance(msg_id, _pulsar.MessageId):
if isinstance(msg_id, MessageId):
msg_id = msg_id._msg_id
else:
raise TypeError("unknown message id type")
self._msg_batch.with_message_id(msg_id)
return self
def parse_from(self, data, size):
self._msg_batch.parse_from(data, size)
_msgs = self._msg_batch.messages()
return list(map(Message._wrap, _msgs))
class Authentication:
"""
Authentication provider object. Used to load authentication from an external
shared library.
"""
def __init__(self, dynamicLibPath, authParamsString):
"""
Create the authentication provider instance.
**Args**
* `dynamicLibPath`: Path to the authentication provider shared library
(such as `tls.so`)
* `authParamsString`: Comma-separated list of provider-specific
configuration params
"""
_check_type(str, dynamicLibPath, 'dynamicLibPath')
_check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
class AuthenticationTLS(Authentication):
"""
TLS Authentication implementation
"""
def __init__(self, certificate_path, private_key_path):
"""
Create the TLS authentication provider instance.
**Args**
* `certificatePath`: Path to the public certificate
* `privateKeyPath`: Path to private TLS key
"""
_check_type(str, certificate_path, 'certificate_path')
_check_type(str, private_key_path, 'private_key_path')
self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
class AuthenticationToken(Authentication):
"""
Token based authentication implementation
"""
def __init__(self, token):
"""
Create the token authentication provider instance.
**Args**
* `token`: A string containing the token or a functions that provides a
string with the token
"""
if not (isinstance(token, str) or callable(token)):
raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
self.auth = _pulsar.AuthenticationToken(token)
class AuthenticationAthenz(Authentication):
"""
Athenz Authentication implementation
"""
def __init__(self, auth_params_string):
"""
Create the Athenz authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Athenz client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
class AuthenticationOauth2(Authentication):
"""
Oauth2 Authentication implementation
"""
def __init__(self, auth_params_string):
"""
Create the Oauth2 authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Oauth2 client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
class Client:
"""
The Pulsar client. A single client instance can be used to create producers
and consumers on multiple topics.
The client will share the same connection pool and threads across all
producers and consumers.
"""
def __init__(self, service_url,
authentication=None,
operation_timeout_seconds=30,
io_threads=1,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None,
connection_timeout_ms=10000,
):
"""
Create a new Pulsar client instance.
**Args**
* `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
**Options**
* `authentication`:
Set the authentication provider to be used with the broker. For example:
`AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
* `operation_timeout_seconds`:
Set timeout on client operations (subscribe, create producer, close,
unsubscribe).
* `io_threads`:
Set the number of IO threads to be used by the Pulsar client.
* `message_listener_threads`:
Set the number of threads to be used by the Pulsar client when
delivering messages through message listener. The default is 1 thread
per Pulsar client. If using more than 1 thread, messages for distinct
`message_listener`s will be delivered in different threads, however a
single `MessageListener` will always be assigned to the same thread.
* `concurrent_lookup_requests`:
Number of concurrent lookup-requests allowed on each broker connection
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
set to `pulsar+ssl://` or `https://`
* `tls_trust_certs_file_path`:
Set the path to the trusted TLS certificate file. If empty defaults to
certifi.
* `tls_allow_insecure_connection`:
Configure whether the Pulsar client accepts untrusted TLS certificates
from the broker.
* `tls_validate_hostname`:
Configure whether the Pulsar client validates that the hostname of the
endpoint, matches the common name on the TLS certificate presented by
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
* `connection_timeout_ms`:
Set timeout in milliseconds on TCP connections.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(logging.Logger, logger, 'logger')
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
else:
conf.tls_trust_certs_file_path(certifi.where())
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
conf.tls_validate_hostname(tls_validate_hostname)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []
def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
max_pending_messages=1000,
max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
lazy_start_partitioned_producers=False,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
crypto_key_reader=None
):
"""
Create a new producer on a given topic.
**Args**
* `topic`:
The topic name
**Options**
* `producer_name`:
Specify a name for the producer. If not assigned,
the system will generate a globally unique name which can be accessed
with `Producer.producer_name()`. When specifying a name, it is app to
the user to ensure that, for a given topic, the producer name is unique
across all Pulsar's clusters.
* `schema`:
Define the schema of the data that will be published by this producer.
The schema will be used for two purposes:
- Validate the data format against the topic defined schema
- Perform serialization/deserialization between data and objects
An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
* `initial_sequence_id`:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
`(initialSequenceId + 1)`` as its sequence id and subsequent messages will
be assigned incremental sequence ids, if not otherwise specified.
* `send_timeout_millis`:
If a message is not acknowledged by the server before the
`send_timeout` expires, an error will be reported.
* `compression_type`:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
`CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
release in order to be able to receive messages compressed with SNAPPY.
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
* `max_pending_messages_across_partitions`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment across partitions from the broker.
* `block_if_queue_full`: Set whether `send_async` operations should
block when the outgoing message queue is full.
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
* `lazy_start_partitioned_producers`:
This config affects producers of partitioned topics only. It controls whether
producers register and connect immediately to the owner broker of each partition
or start lazily on demand. The internal producer of one partition is always
started eagerly, chosen by the routing policy, but the internal producers of
any additional partitions are started on demand, upon receiving their first
message.
Using this mode can reduce the strain on brokers for topics with large numbers of
partitions and when the SinglePartition routing policy is used without keyed messages.
Because producer connection can be on demand, this can produce extra send latency
for the first messages of a given partition.
* `properties`:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.
* `batching_type`:
Sets the batching type for the producer.
There are two batching type: DefaultBatching and KeyBasedBatching.
- Default batching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
- KeyBasedBatching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
* encryption_key:
The key used for symmetric encryption, configured on the producer side
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
_check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
_check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type_or_none(dict, properties, 'properties')
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
conf.partitions_routing_mode(message_routing_mode)
conf.batching_type(batching_type)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.schema(schema.schema_info())
if encryption_key:
conf.encryption_key(encryption_key)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
p = Producer()
p._producer = self._client.create_producer(topic, conf)
p._schema = schema
p._client = self._client
return p
def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest,
crypto_key_reader=None,
replicate_subscription_state_enabled=False
):
"""
Subscribe to the given topic and subscription combination.
**Args**
* `topic`: The name of the topic, list of topics or regex pattern.
This method will accept these forms:
- `topic='my-topic'`
- `topic=['topic-1', 'topic-2', 'topic-3']`
- `topic=re.compile('persistent://public/default/topic-*')`
* `subscription`: The name of the subscription.
**Options**
* `consumer_type`:
Select the subscription type to be used when subscribing to the topic.
* `schema`:
Define the schema of the data that will be received by this consumer.
* `message_listener`:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
`consumer.receive()` will not be allowed. The listener function needs
to accept (consumer, message), for example:
#!python
def my_listener(consumer, message):
# process message
consumer.acknowledge(message)
* `receiver_queue_size`:
Sets the size of the consumer receive queue. The consumer receive
queue controls how many messages can be accumulated by the consumer
before the application calls `receive()`. Using a higher value could
potentially increase the consumer throughput at the expense of higher
memory utilization. Setting the consumer queue size to zero decreases
the throughput of the consumer by disabling pre-fetching of messages.
This approach improves the message distribution on shared subscription
by pushing messages only to those consumers that are ready to process
them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The `receive()` function call
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.
* `max_total_receiver_queue_size_across_partitions`
Set the max total receiver queue size across partitions.
This setting will be used to reduce the receiver queue size for individual partitions
* `consumer_name`:
Sets the consumer name.
* `unacked_messages_timeout_ms`:
Sets the timeout in milliseconds for unacknowledged messages. The
timeout needs to be greater than 10 seconds. An exception is thrown if
the given value is less than 10 seconds. If a successful
acknowledgement is not sent within the timeout, all the unacknowledged
messages are redelivered.
* `negative_ack_redelivery_delay_ms`:
The delay after which to redeliver the messages that failed to be
processed (with the `consumer.negative_acknowledge()`)
* `broker_consumer_stats_cache_time_ms`:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* `properties`:
Sets the properties for the consumer. The properties associated with a consumer
can be used for identify a consumer at broker side.
* `pattern_auto_discovery_period`:
Periods of seconds for consumer to auto discover match topics.
* `initial_position`:
Set the initial position of a consumer when subscribing to the topic.
It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
Default: `Latest`.
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
* replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default: `False`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
_check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
_check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.subscription_initial_position(initial_position)
conf.schema(schema.schema_info())
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
c = Consumer()
if isinstance(topic, str):
# Single topic
c._consumer = self._client.subscribe(topic, subscription_name, conf)
elif isinstance(topic, list):
# List of topics
c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
elif isinstance(topic, _retype):
# Regex pattern
c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
else:
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
c._client = self
c._schema = schema
self._consumers.append(c)
return c
def create_reader(self, topic, start_message_id,
schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
):
"""
Create a reader on a particular topic
**Args**
* `topic`: The name of the topic.
* `start_message_id`: The initial reader positioning is done by specifying a message id.
The options are:
* `MessageId.earliest`: Start reading from the earliest message available in the topic
* `MessageId.latest`: Start reading from the end topic, only getting messages published
after the reader was created
* `MessageId`: When passing a particular message id, the reader will position itself on
that specific position. The first message to be read will be the message next to the
specified messageId. Message id can be serialized into a string and deserialized
back into a `MessageId` object:
# Serialize to string
s = msg.message_id().serialize()
# Deserialize from string
msg_id = MessageId.deserialize(s)
**Options**
* `schema`:
Define the schema of the data that will be received by this reader.
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
`reader.read_next()` will not be allowed. The listener function needs
to accept (reader, message), for example:
def my_listener(reader, message):
# process message
pass
* `receiver_queue_size`:
Sets the size of the reader receive queue. The reader receive
queue controls how many messages can be accumulated by the reader
before the application calls `read_next()`. Using a higher value could
potentially increase the reader throughput at the expense of higher
memory utilization.
* `reader_name`:
Sets the reader name.
* `subscription_role_prefix`:
Sets the subscription role prefix.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
c._schema = schema
self._consumers.append(c)
return c
def get_topic_partitions(self, topic):
"""
Get the list of partitions for a given topic.
If the topic is partitioned, this will return a list of partition names. If the topic is not
partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer
instances directly on a particular partition.
:param topic: the topic name to lookup
:return: a list of partition name
"""
_check_type(str, topic, 'topic')
return self._client.get_topic_partitions(topic)
def shutdown(self):
"""
Perform immediate shutdown of Pulsar client.
Release all resources and close all producer, consumer, and readers without waiting
for ongoing operations to complete.
"""
self._client.shutdown()
def close(self):
"""
Close the client and all the associated producers and consumers
"""
self._client.close()
class Producer:
"""
The Pulsar message producer, used to publish messages on a topic.
"""
def topic(self):
"""
Return the topic which producer is publishing to
"""
return self._producer.topic()
def producer_name(self):
"""
Return the producer name which could have been assigned by the
system or specified by the client
"""
return self._producer.producer_name()
def last_sequence_id(self):
"""
Get the last sequence id that was published by this producer.
This represent either the automatically assigned or custom sequence id
(set on the `MessageBuilder`) that was published and acknowledged by the broker.
After recreating a producer with the same producer name, this will return the
last message that was published in the previous producer session, or -1 if
there no message was ever published.
"""
return self._producer.last_sequence_id()
def send(self, content,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Publish a message on the topic. Blocks until the message is acknowledged
Returns a `MessageId` object that represents where the message is persisted.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application-defined string properties.
* `partition_key`:
Sets the partition key for message routing. A hash of this key is used
to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`:
Override namespace replication clusters. Note that it is the caller's
responsibility to provide valid cluster names and that all clusters
have been previously configured as topics. Given an empty list,
the message will replicate according to the namespace configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
return MessageId.deserialize(self._producer.send(msg))
def send_async(self, content, callback,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Send a message asynchronously.
The `callback` will be invoked once the message has been acknowledged
by the broker.
Example:
#!python
def callback(res, msg_id):
print('Message published: %s' % res)
producer.send_async(msg, callback)
When the producer queue is full, by default the message will be rejected
and the callback invoked with an error code.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application0-defined string properties.
* `partition_key`:
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`: Override namespace replication clusters. Note
that it is the caller's responsibility to provide valid cluster names
and that all clusters have been previously configured as topics.
Given an empty list, the message will replicate per the namespace
configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)
def flush(self):
"""
Flush all the messages buffered in the client and wait until all messages have been
successfully persisted
"""
self._producer.flush()
def close(self):
"""
Close the producer.
"""
self._producer.close()
def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)
_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
_check_type_or_none(list, replication_clusters, 'replication_clusters')
_check_type(bool, disable_replication, 'disable_replication')
_check_type_or_none(int, event_timestamp, 'event_timestamp')
_check_type_or_none(int, deliver_at, 'deliver_at')
_check_type_or_none(timedelta, deliver_after, 'deliver_after')
mb = _pulsar.MessageBuilder()
mb.content(data)
if properties:
for k, v in properties.items():
mb.property(k, v)
if partition_key:
mb.partition_key(partition_key)
if sequence_id:
mb.sequence_id(sequence_id)
if replication_clusters:
mb.replication_clusters(replication_clusters)
if disable_replication:
mb.disable_replication(disable_replication)
if event_timestamp:
mb.event_timestamp(event_timestamp)
if deliver_at:
mb.deliver_at(deliver_at)
if deliver_after:
mb.deliver_after(deliver_after)
return mb.build()
def is_connected(self):
"""
Check if the producer is connected or not.
"""
return self._producer.is_connected()
class Consumer:
"""
Pulsar consumer.
"""
def topic(self):
"""
Return the topic this consumer is subscribed to.
"""
return self._consumer.topic()
def subscription_name(self):
"""
Return the subscription name.
"""
return self._consumer.subscription_name()
def unsubscribe(self):
"""
Unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the
consumer is unsubscribed, no more messages will be received and
subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
"""
return self._consumer.unsubscribe()
def receive(self, timeout_millis=None):
"""
Receive a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._consumer.receive()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._consumer.receive(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
def acknowledge(self, message):
"""
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker.
After that, the message will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
else:
self._consumer.acknowledge(message)
def acknowledge_cumulative(self, message):
"""
Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
This method will block until an acknowledgement is sent to the broker.
After that, the messages will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
else:
self._consumer.acknowledge_cumulative(message)
def negative_acknowledge(self, message):
"""
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.negative_acknowledge(message._message)
else:
self._consumer.negative_acknowledge(message)
def pause_message_listener(self):
"""
Pause receiving messages via the `message_listener` until
`resume_message_listener()` is called.
"""
self._consumer.pause_message_listener()
def resume_message_listener(self):
"""
Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
`pause_message_listener()` was called.
"""
self._consumer.resume_message_listener()
def redeliver_unacknowledged_messages(self):
"""
Redelivers all the unacknowledged messages. In failover mode, the
request is ignored if the consumer is not active for the given topic. In
shared mode, the consumer's messages to be redelivered are distributed
across all the connected consumers. This is a non-blocking call and
doesn't throw an exception. In case the connection breaks, the messages
are redelivered after reconnect.
"""
self._consumer.redeliver_unacknowledged_messages()
def seek(self, messageid):
"""
Reset the subscription associated with this consumer to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._consumer.seek(messageid)
def close(self):
"""
Close the consumer.
"""
self._consumer.close()
self._client._consumers.remove(self)
def is_connected(self):
"""
Check if the consumer is connected or not.
"""
return self._consumer.is_connected()
class Reader:
"""
Pulsar topic reader.
"""
def topic(self):
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()
def read_next(self, timeout_millis=None):
"""
Read a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._reader.read_next()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._reader.read_next(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
def has_message_available(self):
"""
Check if there is any message available to read from the current position.
"""
return self._reader.has_message_available();
def seek(self, messageid):
"""
Reset this reader to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._reader.seek(messageid)
def close(self):
"""
Close the reader.
"""
self._reader.close()
self._client._consumers.remove(self)
def is_connected(self):
"""
Check if the reader is connected or not.
"""
return self._reader.is_connected()
class CryptoKeyReader:
"""
Default crypto key reader implementation
"""
def __init__(self, public_key_path, private_key_path):
"""
Create crypto key reader.
**Args**
* `public_key_path`: Path to the public key
* `private_key_path`: Path to private key
"""
_check_type(str, public_key_path, 'public_key_path')
_check_type(str, private_key_path, 'private_key_path')
self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
def _check_type(var_type, var, name):
if not isinstance(var, var_type):
raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
% (name, var_type.__name__, type(var).__name__))
def _check_type_or_none(var_type, var, name):
if var is not None and not isinstance(var, var_type):
raise ValueError("Argument %s is expected to be either None or of type '%s'"
% (name, var_type.__name__))
def _listener_wrapper(listener, schema):
def wrapper(consumer, msg):
c = Consumer()
c._consumer = consumer
m = Message()
m._message = msg
m._schema = schema
listener(c, m)
return wrapper
</code></pre>
</div>
</header>
<section id="section-items">
<h2 class="section-title" id="header-classes">Classes</h2>
<div class="item">
<p id="pulsar.Authentication" class="name">class <span class="ident">Authentication</span></p>
<div class="desc"><p>Authentication provider object. Used to load authentication from an external
shared library.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Authentication" class="source">
<pre><code>class Authentication:
"""
Authentication provider object. Used to load authentication from an external
shared library.
"""
def __init__(self, dynamicLibPath, authParamsString):
"""
Create the authentication provider instance.
**Args**
* `dynamicLibPath`: Path to the authentication provider shared library
(such as `tls.so`)
* `authParamsString`: Comma-separated list of provider-specific
configuration params
"""
_check_type(str, dynamicLibPath, 'dynamicLibPath')
_check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Authentication">Authentication</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.Authentication.auth" class="name">var <span class="ident">auth</span></p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Authentication.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, dynamicLibPath, authParamsString)</p>
</div>
<div class="desc"><p>Create the authentication provider instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>dynamicLibPath</code>: Path to the authentication provider shared library
(such as <code>tls.so</code>)</li>
<li><code>authParamsString</code>: Comma-separated list of provider-specific
configuration params</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Authentication.__init__" class="source">
<pre><code>def __init__(self, dynamicLibPath, authParamsString):
"""
Create the authentication provider instance.
**Args**
* `dynamicLibPath`: Path to the authentication provider shared library
(such as `tls.so`)
* `authParamsString`: Comma-separated list of provider-specific
configuration params
"""
_check_type(str, dynamicLibPath, 'dynamicLibPath')
_check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.AuthenticationAthenz" class="name">class <span class="ident">AuthenticationAthenz</span></p>
<div class="desc"><p>Athenz Authentication implementation</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationAthenz" class="source">
<pre><code>class AuthenticationAthenz(Authentication):
"""
Athenz Authentication implementation
"""
def __init__(self, auth_params_string):
"""
Create the Athenz authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Athenz client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></li>
<li><a href="#pulsar.Authentication">Authentication</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.AuthenticationAthenz.auth" class="name">var <span class="ident">auth</span></p>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
</p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.AuthenticationAthenz.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
</div>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
</p>
<div class="desc"><p>Create the Athenz authentication provider instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>auth_params_string</code>: JSON encoded configuration for Athenz client</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationAthenz.__init__" class="source">
<pre><code>def __init__(self, auth_params_string):
"""
Create the Athenz authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Athenz client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.AuthenticationOauth2" class="name">class <span class="ident">AuthenticationOauth2</span></p>
<div class="desc"><p>Oauth2 Authentication implementation</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationOauth2" class="source">
<pre><code>class AuthenticationOauth2(Authentication):
"""
Oauth2 Authentication implementation
"""
def __init__(self, auth_params_string):
"""
Create the Oauth2 authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Oauth2 client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></li>
<li><a href="#pulsar.Authentication">Authentication</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.AuthenticationOauth2.auth" class="name">var <span class="ident">auth</span></p>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
</p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.AuthenticationOauth2.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
</div>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
</p>
<div class="desc"><p>Create the Oauth2 authentication provider instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>auth_params_string</code>: JSON encoded configuration for Oauth2 client</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationOauth2.__init__" class="source">
<pre><code>def __init__(self, auth_params_string):
"""
Create the Oauth2 authentication provider instance.
**Args**
* `auth_params_string`: JSON encoded configuration for Oauth2 client
"""
_check_type(str, auth_params_string, 'auth_params_string')
self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.AuthenticationTLS" class="name">class <span class="ident">AuthenticationTLS</span></p>
<div class="desc"><p>TLS Authentication implementation</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationTLS" class="source">
<pre><code>class AuthenticationTLS(Authentication):
"""
TLS Authentication implementation
"""
def __init__(self, certificate_path, private_key_path):
"""
Create the TLS authentication provider instance.
**Args**
* `certificatePath`: Path to the public certificate
* `privateKeyPath`: Path to private TLS key
"""
_check_type(str, certificate_path, 'certificate_path')
_check_type(str, private_key_path, 'private_key_path')
self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></li>
<li><a href="#pulsar.Authentication">Authentication</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.AuthenticationTLS.auth" class="name">var <span class="ident">auth</span></p>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
</p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.AuthenticationTLS.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, certificate_path, private_key_path)</p>
</div>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
</p>
<div class="desc"><p>Create the TLS authentication provider instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>certificatePath</code>: Path to the public certificate</li>
<li><code>privateKeyPath</code>: Path to private TLS key</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationTLS.__init__" class="source">
<pre><code>def __init__(self, certificate_path, private_key_path):
"""
Create the TLS authentication provider instance.
**Args**
* `certificatePath`: Path to the public certificate
* `privateKeyPath`: Path to private TLS key
"""
_check_type(str, certificate_path, 'certificate_path')
_check_type(str, private_key_path, 'private_key_path')
self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.AuthenticationToken" class="name">class <span class="ident">AuthenticationToken</span></p>
<div class="desc"><p>Token based authentication implementation</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationToken" class="source">
<pre><code>class AuthenticationToken(Authentication):
"""
Token based authentication implementation
"""
def __init__(self, token):
"""
Create the token authentication provider instance.
**Args**
* `token`: A string containing the token or a functions that provides a
string with the token
"""
if not (isinstance(token, str) or callable(token)):
raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
self.auth = _pulsar.AuthenticationToken(token)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></li>
<li><a href="#pulsar.Authentication">Authentication</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.AuthenticationToken.auth" class="name">var <span class="ident">auth</span></p>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
</p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.AuthenticationToken.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, token)</p>
</div>
<p class="inheritance">
<strong>Inheritance:</strong>
<code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
</p>
<div class="desc"><p>Create the token authentication provider instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>token</code>: A string containing the token or a functions that provides a
string with the token</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.AuthenticationToken.__init__" class="source">
<pre><code>def __init__(self, token):
"""
Create the token authentication provider instance.
**Args**
* `token`: A string containing the token or a functions that provides a
string with the token
"""
if not (isinstance(token, str) or callable(token)):
raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
self.auth = _pulsar.AuthenticationToken(token)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.Client" class="name">class <span class="ident">Client</span></p>
<div class="desc"><p>The Pulsar client. A single client instance can be used to create producers
and consumers on multiple topics.</p>
<p>The client will share the same connection pool and threads across all
producers and consumers.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client" class="source">
<pre><code>class Client:
"""
The Pulsar client. A single client instance can be used to create producers
and consumers on multiple topics.
The client will share the same connection pool and threads across all
producers and consumers.
"""
def __init__(self, service_url,
authentication=None,
operation_timeout_seconds=30,
io_threads=1,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None,
connection_timeout_ms=10000,
):
"""
Create a new Pulsar client instance.
**Args**
* `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
**Options**
* `authentication`:
Set the authentication provider to be used with the broker. For example:
`AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
* `operation_timeout_seconds`:
Set timeout on client operations (subscribe, create producer, close,
unsubscribe).
* `io_threads`:
Set the number of IO threads to be used by the Pulsar client.
* `message_listener_threads`:
Set the number of threads to be used by the Pulsar client when
delivering messages through message listener. The default is 1 thread
per Pulsar client. If using more than 1 thread, messages for distinct
`message_listener`s will be delivered in different threads, however a
single `MessageListener` will always be assigned to the same thread.
* `concurrent_lookup_requests`:
Number of concurrent lookup-requests allowed on each broker connection
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
set to `pulsar+ssl://` or `https://`
* `tls_trust_certs_file_path`:
Set the path to the trusted TLS certificate file. If empty defaults to
certifi.
* `tls_allow_insecure_connection`:
Configure whether the Pulsar client accepts untrusted TLS certificates
from the broker.
* `tls_validate_hostname`:
Configure whether the Pulsar client validates that the hostname of the
endpoint, matches the common name on the TLS certificate presented by
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
* `connection_timeout_ms`:
Set timeout in milliseconds on TCP connections.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(logging.Logger, logger, 'logger')
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
else:
conf.tls_trust_certs_file_path(certifi.where())
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
conf.tls_validate_hostname(tls_validate_hostname)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []
def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
max_pending_messages=1000,
max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
lazy_start_partitioned_producers=False,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
crypto_key_reader=None
):
"""
Create a new producer on a given topic.
**Args**
* `topic`:
The topic name
**Options**
* `producer_name`:
Specify a name for the producer. If not assigned,
the system will generate a globally unique name which can be accessed
with `Producer.producer_name()`. When specifying a name, it is app to
the user to ensure that, for a given topic, the producer name is unique
across all Pulsar's clusters.
* `schema`:
Define the schema of the data that will be published by this producer.
The schema will be used for two purposes:
- Validate the data format against the topic defined schema
- Perform serialization/deserialization between data and objects
An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
* `initial_sequence_id`:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
`(initialSequenceId + 1)`` as its sequence id and subsequent messages will
be assigned incremental sequence ids, if not otherwise specified.
* `send_timeout_millis`:
If a message is not acknowledged by the server before the
`send_timeout` expires, an error will be reported.
* `compression_type`:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
`CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
release in order to be able to receive messages compressed with SNAPPY.
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
* `max_pending_messages_across_partitions`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment across partitions from the broker.
* `block_if_queue_full`: Set whether `send_async` operations should
block when the outgoing message queue is full.
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
* `lazy_start_partitioned_producers`:
This config affects producers of partitioned topics only. It controls whether
producers register and connect immediately to the owner broker of each partition
or start lazily on demand. The internal producer of one partition is always
started eagerly, chosen by the routing policy, but the internal producers of
any additional partitions are started on demand, upon receiving their first
message.
Using this mode can reduce the strain on brokers for topics with large numbers of
partitions and when the SinglePartition routing policy is used without keyed messages.
Because producer connection can be on demand, this can produce extra send latency
for the first messages of a given partition.
* `properties`:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.
* `batching_type`:
Sets the batching type for the producer.
There are two batching type: DefaultBatching and KeyBasedBatching.
- Default batching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
- KeyBasedBatching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
* encryption_key:
The key used for symmetric encryption, configured on the producer side
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
_check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
_check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type_or_none(dict, properties, 'properties')
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
conf.partitions_routing_mode(message_routing_mode)
conf.batching_type(batching_type)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.schema(schema.schema_info())
if encryption_key:
conf.encryption_key(encryption_key)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
p = Producer()
p._producer = self._client.create_producer(topic, conf)
p._schema = schema
p._client = self._client
return p
def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest,
crypto_key_reader=None,
replicate_subscription_state_enabled=False
):
"""
Subscribe to the given topic and subscription combination.
**Args**
* `topic`: The name of the topic, list of topics or regex pattern.
This method will accept these forms:
- `topic='my-topic'`
- `topic=['topic-1', 'topic-2', 'topic-3']`
- `topic=re.compile('persistent://public/default/topic-*')`
* `subscription`: The name of the subscription.
**Options**
* `consumer_type`:
Select the subscription type to be used when subscribing to the topic.
* `schema`:
Define the schema of the data that will be received by this consumer.
* `message_listener`:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
`consumer.receive()` will not be allowed. The listener function needs
to accept (consumer, message), for example:
#!python
def my_listener(consumer, message):
# process message
consumer.acknowledge(message)
* `receiver_queue_size`:
Sets the size of the consumer receive queue. The consumer receive
queue controls how many messages can be accumulated by the consumer
before the application calls `receive()`. Using a higher value could
potentially increase the consumer throughput at the expense of higher
memory utilization. Setting the consumer queue size to zero decreases
the throughput of the consumer by disabling pre-fetching of messages.
This approach improves the message distribution on shared subscription
by pushing messages only to those consumers that are ready to process
them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The `receive()` function call
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.
* `max_total_receiver_queue_size_across_partitions`
Set the max total receiver queue size across partitions.
This setting will be used to reduce the receiver queue size for individual partitions
* `consumer_name`:
Sets the consumer name.
* `unacked_messages_timeout_ms`:
Sets the timeout in milliseconds for unacknowledged messages. The
timeout needs to be greater than 10 seconds. An exception is thrown if
the given value is less than 10 seconds. If a successful
acknowledgement is not sent within the timeout, all the unacknowledged
messages are redelivered.
* `negative_ack_redelivery_delay_ms`:
The delay after which to redeliver the messages that failed to be
processed (with the `consumer.negative_acknowledge()`)
* `broker_consumer_stats_cache_time_ms`:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* `properties`:
Sets the properties for the consumer. The properties associated with a consumer
can be used for identify a consumer at broker side.
* `pattern_auto_discovery_period`:
Periods of seconds for consumer to auto discover match topics.
* `initial_position`:
Set the initial position of a consumer when subscribing to the topic.
It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
Default: `Latest`.
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
* replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default: `False`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
_check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
_check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.subscription_initial_position(initial_position)
conf.schema(schema.schema_info())
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
c = Consumer()
if isinstance(topic, str):
# Single topic
c._consumer = self._client.subscribe(topic, subscription_name, conf)
elif isinstance(topic, list):
# List of topics
c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
elif isinstance(topic, _retype):
# Regex pattern
c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
else:
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
c._client = self
c._schema = schema
self._consumers.append(c)
return c
def create_reader(self, topic, start_message_id,
schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
):
"""
Create a reader on a particular topic
**Args**
* `topic`: The name of the topic.
* `start_message_id`: The initial reader positioning is done by specifying a message id.
The options are:
* `MessageId.earliest`: Start reading from the earliest message available in the topic
* `MessageId.latest`: Start reading from the end topic, only getting messages published
after the reader was created
* `MessageId`: When passing a particular message id, the reader will position itself on
that specific position. The first message to be read will be the message next to the
specified messageId. Message id can be serialized into a string and deserialized
back into a `MessageId` object:
# Serialize to string
s = msg.message_id().serialize()
# Deserialize from string
msg_id = MessageId.deserialize(s)
**Options**
* `schema`:
Define the schema of the data that will be received by this reader.
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
`reader.read_next()` will not be allowed. The listener function needs
to accept (reader, message), for example:
def my_listener(reader, message):
# process message
pass
* `receiver_queue_size`:
Sets the size of the reader receive queue. The reader receive
queue controls how many messages can be accumulated by the reader
before the application calls `read_next()`. Using a higher value could
potentially increase the reader throughput at the expense of higher
memory utilization.
* `reader_name`:
Sets the reader name.
* `subscription_role_prefix`:
Sets the subscription role prefix.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
c._schema = schema
self._consumers.append(c)
return c
def get_topic_partitions(self, topic):
"""
Get the list of partitions for a given topic.
If the topic is partitioned, this will return a list of partition names. If the topic is not
partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer
instances directly on a particular partition.
:param topic: the topic name to lookup
:return: a list of partition name
"""
_check_type(str, topic, 'topic')
return self._client.get_topic_partitions(topic)
def shutdown(self):
"""
Perform immediate shutdown of Pulsar client.
Release all resources and close all producer, consumer, and readers without waiting
for ongoing operations to complete.
"""
self._client.shutdown()
def close(self):
"""
Close the client and all the associated producers and consumers
"""
self._client.close()
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Client">Client</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Client.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, logger=None, connection_timeout_ms=10000)</p>
</div>
<div class="desc"><p>Create a new Pulsar client instance.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>service_url</code>: The Pulsar service url eg: pulsar://my-broker.com:6650/</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>authentication</code>:
Set the authentication provider to be used with the broker. For example:
<code>AuthenticationTls</code>, AuthenticaionToken, <code>AuthenticationAthenz</code>or <code>AuthenticationOauth2</code></li>
<li><code>operation_timeout_seconds</code>:
Set timeout on client operations (subscribe, create producer, close,
unsubscribe).</li>
<li><code>io_threads</code>:
Set the number of IO threads to be used by the Pulsar client.</li>
<li><code>message_listener_threads</code>:
Set the number of threads to be used by the Pulsar client when
delivering messages through message listener. The default is 1 thread
per Pulsar client. If using more than 1 thread, messages for distinct
<code>message_listener</code>s will be delivered in different threads, however a
single <code>MessageListener</code> will always be assigned to the same thread.</li>
<li><code>concurrent_lookup_requests</code>:
Number of concurrent lookup-requests allowed on each broker connection
to prevent overload on the broker.</li>
<li><code>log_conf_file_path</code>:
Initialize log4cxx from a configuration file.</li>
<li><code>use_tls</code>:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the <code>serviceUrl</code> is
set to <code>pulsar+ssl://</code> or <code>https://</code></li>
<li><code>tls_trust_certs_file_path</code>:
Set the path to the trusted TLS certificate file. If empty defaults to
certifi.</li>
<li><code>tls_allow_insecure_connection</code>:
Configure whether the Pulsar client accepts untrusted TLS certificates
from the broker.</li>
<li><code>tls_validate_hostname</code>:
Configure whether the Pulsar client validates that the hostname of the
endpoint, matches the common name on the TLS certificate presented by
the endpoint.</li>
<li><code>logger</code>:
Set a Python logger for this Pulsar client. Should be an instance of <code>logging.Logger</code>.</li>
<li><code>connection_timeout_ms</code>:
Set timeout in milliseconds on TCP connections.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.__init__" class="source">
<pre><code>def __init__(self, service_url,
authentication=None,
operation_timeout_seconds=30,
io_threads=1,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None,
connection_timeout_ms=10000,
):
"""
Create a new Pulsar client instance.
**Args**
* `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
**Options**
* `authentication`:
Set the authentication provider to be used with the broker. For example:
`AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
* `operation_timeout_seconds`:
Set timeout on client operations (subscribe, create producer, close,
unsubscribe).
* `io_threads`:
Set the number of IO threads to be used by the Pulsar client.
* `message_listener_threads`:
Set the number of threads to be used by the Pulsar client when
delivering messages through message listener. The default is 1 thread
per Pulsar client. If using more than 1 thread, messages for distinct
`message_listener`s will be delivered in different threads, however a
single `MessageListener` will always be assigned to the same thread.
* `concurrent_lookup_requests`:
Number of concurrent lookup-requests allowed on each broker connection
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
set to `pulsar+ssl://` or `https://`
* `tls_trust_certs_file_path`:
Set the path to the trusted TLS certificate file. If empty defaults to
certifi.
* `tls_allow_insecure_connection`:
Configure whether the Pulsar client accepts untrusted TLS certificates
from the broker.
* `tls_validate_hostname`:
Configure whether the Pulsar client validates that the hostname of the
endpoint, matches the common name on the TLS certificate presented by
the endpoint.
* `logger`:
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
* `connection_timeout_ms`:
Set timeout in milliseconds on TCP connections.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(logging.Logger, logger, 'logger')
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
else:
conf.tls_trust_certs_file_path(certifi.where())
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
conf.tls_validate_hostname(tls_validate_hostname)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.close">
<p>def <span class="ident">close</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Close the client and all the associated producers and consumers</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.close', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.close" class="source">
<pre><code>def close(self):
"""
Close the client and all the associated producers and consumers
"""
self._client.close()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.create_producer">
<p>def <span class="ident">create_producer</span>(</p><p>self, topic, producer_name=None, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7fbbd5c4ee50&gt;, initial_sequence_id=None, send_timeout_millis=30000, compression_type=_pulsar.CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms=10, message_routing_mode=_pulsar.PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, batching_type=_pulsar.BatchingType.Default, encryption_key=None, crypto_key_reader=None)</p>
</div>
<div class="desc"><p>Create a new producer on a given topic.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>topic</code>:
The topic name</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>producer_name</code>:
Specify a name for the producer. If not assigned,
the system will generate a globally unique name which can be accessed
with <code>Producer.producer_name()</code>. When specifying a name, it is app to
the user to ensure that, for a given topic, the producer name is unique
across all Pulsar's clusters.</li>
<li><code>schema</code>:
Define the schema of the data that will be published by this producer.
The schema will be used for two purposes:<ul>
<li>Validate the data format against the topic defined schema</li>
<li>Perform serialization/deserialization between data and objects
An example for this parameter would be to pass <code>schema=JsonSchema(MyRecordClass)</code>.</li>
</ul>
</li>
<li><code>initial_sequence_id</code>:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
`(initialSequenceId + 1)`` as its sequence id and subsequent messages will
be assigned incremental sequence ids, if not otherwise specified.</li>
<li><code>send_timeout_millis</code>:
If a message is not acknowledged by the server before the
<code>send_timeout</code> expires, an error will be reported.</li>
<li><code>compression_type</code>:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
<code>CompressionType.LZ4</code>, <code>CompressionType.ZLib</code>, <code>CompressionType.ZSTD</code> and <code>CompressionType.SNAPPY</code>.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
release in order to be able to receive messages compressed with SNAPPY.</li>
<li><code>max_pending_messages</code>:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.</li>
<li><code>max_pending_messages_across_partitions</code>:
Set the max size of the queue holding the messages pending to receive
an acknowledgment across partitions from the broker.</li>
<li><code>block_if_queue_full</code>: Set whether <code>send_async</code> operations should
block when the outgoing message queue is full.</li>
<li><code>message_routing_mode</code>:
Set the message routing mode for the partitioned producer. Default is <code>PartitionsRoutingMode.RoundRobinDistribution</code>,
other option is <code>PartitionsRoutingMode.UseSinglePartition</code></li>
<li><code>lazy_start_partitioned_producers</code>:
This config affects producers of partitioned topics only. It controls whether
producers register and connect immediately to the owner broker of each partition
or start lazily on demand. The internal producer of one partition is always
started eagerly, chosen by the routing policy, but the internal producers of
any additional partitions are started on demand, upon receiving their first
message.
Using this mode can reduce the strain on brokers for topics with large numbers of
partitions and when the SinglePartition routing policy is used without keyed messages.
Because producer connection can be on demand, this can produce extra send latency
for the first messages of a given partition.</li>
<li><code>properties</code>:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.</li>
<li>
<p><code>batching_type</code>:
Sets the batching type for the producer.
There are two batching type: DefaultBatching and KeyBasedBatching.</p>
<ul>
<li>
<p>Default batching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]</p>
</li>
<li>
<p>KeyBasedBatching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]</p>
</li>
<li>encryption_key:
The key used for symmetric encryption, configured on the producer side</li>
<li>crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer</li>
</ul>
</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_producer', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.create_producer" class="source">
<pre><code>def create_producer(self, topic,
producer_name=None,
schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
max_pending_messages=1000,
max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
lazy_start_partitioned_producers=False,
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
crypto_key_reader=None
):
"""
Create a new producer on a given topic.
**Args**
* `topic`:
The topic name
**Options**
* `producer_name`:
Specify a name for the producer. If not assigned,
the system will generate a globally unique name which can be accessed
with `Producer.producer_name()`. When specifying a name, it is app to
the user to ensure that, for a given topic, the producer name is unique
across all Pulsar's clusters.
* `schema`:
Define the schema of the data that will be published by this producer.
The schema will be used for two purposes:
- Validate the data format against the topic defined schema
- Perform serialization/deserialization between data and objects
An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
* `initial_sequence_id`:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
`(initialSequenceId + 1)`` as its sequence id and subsequent messages will
be assigned incremental sequence ids, if not otherwise specified.
* `send_timeout_millis`:
If a message is not acknowledged by the server before the
`send_timeout` expires, an error will be reported.
* `compression_type`:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
`CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
release in order to be able to receive messages compressed with SNAPPY.
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
* `max_pending_messages_across_partitions`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment across partitions from the broker.
* `block_if_queue_full`: Set whether `send_async` operations should
block when the outgoing message queue is full.
* `message_routing_mode`:
Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
other option is `PartitionsRoutingMode.UseSinglePartition`
* `lazy_start_partitioned_producers`:
This config affects producers of partitioned topics only. It controls whether
producers register and connect immediately to the owner broker of each partition
or start lazily on demand. The internal producer of one partition is always
started eagerly, chosen by the routing policy, but the internal producers of
any additional partitions are started on demand, upon receiving their first
message.
Using this mode can reduce the strain on brokers for topics with large numbers of
partitions and when the SinglePartition routing policy is used without keyed messages.
Because producer connection can be on demand, this can produce extra send latency
for the first messages of a given partition.
* `properties`:
Sets the properties for the producer. The properties associated with a producer
can be used for identify a producer at broker side.
* `batching_type`:
Sets the batching type for the producer.
There are two batching type: DefaultBatching and KeyBasedBatching.
- Default batching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
- KeyBasedBatching
incoming single messages:
(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
batched into single batch message:
[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
* encryption_key:
The key used for symmetric encryption, configured on the producer side
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
_check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
_check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
_check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
_check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
_check_type_or_none(dict, properties, 'properties')
_check_type(BatchingType, batching_type, 'batching_type')
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
conf.partitions_routing_mode(message_routing_mode)
conf.batching_type(batching_type)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
conf.initial_sequence_id(initial_sequence_id)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.schema(schema.schema_info())
if encryption_key:
conf.encryption_key(encryption_key)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
p = Producer()
p._producer = self._client.create_producer(topic, conf)
p._schema = schema
p._client = self._client
return p
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.create_reader">
<p>def <span class="ident">create_reader</span>(</p><p>self, topic, start_message_id, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7fbbce1c1350&gt;, reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, crypto_key_reader=None)</p>
</div>
<div class="desc"><p>Create a reader on a particular topic</p>
<p><strong>Args</strong></p>
<ul>
<li><code>topic</code>: The name of the topic.</li>
<li><code>start_message_id</code>: The initial reader positioning is done by specifying a message id.
The options are:<ul>
<li><code>MessageId.earliest</code>: Start reading from the earliest message available in the topic</li>
<li><code>MessageId.latest</code>: Start reading from the end topic, only getting messages published
after the reader was created</li>
<li>
<p><code>MessageId</code>: When passing a particular message id, the reader will position itself on
that specific position. The first message to be read will be the message next to the
specified messageId. Message id can be serialized into a string and deserialized
back into a <code>MessageId</code> object:</p>
<p># Serialize to string
s = msg.message_id().serialize()</p>
<p># Deserialize from string
msg_id = MessageId.deserialize(s)</p>
</li>
</ul>
</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>schema</code>:
Define the schema of the data that will be received by this reader.</li>
<li>
<p><code>reader_listener</code>:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
<code>reader.read_next()</code> will not be allowed. The listener function needs
to accept (reader, message), for example:</p>
<pre><code>def my_listener(reader, message):
# process message
pass
</code></pre>
</li>
<li>
<p><code>receiver_queue_size</code>:
Sets the size of the reader receive queue. The reader receive
queue controls how many messages can be accumulated by the reader
before the application calls <code>read_next()</code>. Using a higher value could
potentially increase the reader throughput at the expense of higher
memory utilization.</p>
</li>
<li><code>reader_name</code>:
Sets the reader name.</li>
<li><code>subscription_role_prefix</code>:
Sets the subscription role prefix.</li>
<li><code>is_read_compacted</code>:
Selects whether to read the compacted version of the topic</li>
<li>crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_reader', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.create_reader" class="source">
<pre><code>def create_reader(self, topic, start_message_id,
schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
):
"""
Create a reader on a particular topic
**Args**
* `topic`: The name of the topic.
* `start_message_id`: The initial reader positioning is done by specifying a message id.
The options are:
* `MessageId.earliest`: Start reading from the earliest message available in the topic
* `MessageId.latest`: Start reading from the end topic, only getting messages published
after the reader was created
* `MessageId`: When passing a particular message id, the reader will position itself on
that specific position. The first message to be read will be the message next to the
specified messageId. Message id can be serialized into a string and deserialized
back into a `MessageId` object:
# Serialize to string
s = msg.message_id().serialize()
# Deserialize from string
msg_id = MessageId.deserialize(s)
**Options**
* `schema`:
Define the schema of the data that will be received by this reader.
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
`reader.read_next()` will not be allowed. The listener function needs
to accept (reader, message), for example:
def my_listener(reader, message):
# process message
pass
* `receiver_queue_size`:
Sets the size of the reader receive queue. The reader receive
queue controls how many messages can be accumulated by the reader
before the application calls `read_next()`. Using a higher value could
potentially increase the reader throughput at the expense of higher
memory utilization.
* `reader_name`:
Sets the reader name.
* `subscription_role_prefix`:
Sets the subscription role prefix.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
c._schema = schema
self._consumers.append(c)
return c
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.get_topic_partitions">
<p>def <span class="ident">get_topic_partitions</span>(</p><p>self, topic)</p>
</div>
<div class="desc"><p>Get the list of partitions for a given topic.</p>
<p>If the topic is partitioned, this will return a list of partition names. If the topic is not
partitioned, the returned list will contain the topic name itself.</p>
<p>This can be used to discover the partitions and create Reader, Consumer or Producer
instances directly on a particular partition.
:param topic: the topic name to lookup
:return: a list of partition name</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.get_topic_partitions', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.get_topic_partitions" class="source">
<pre><code>def get_topic_partitions(self, topic):
"""
Get the list of partitions for a given topic.
If the topic is partitioned, this will return a list of partition names. If the topic is not
partitioned, the returned list will contain the topic name itself.
This can be used to discover the partitions and create Reader, Consumer or Producer
instances directly on a particular partition.
:param topic: the topic name to lookup
:return: a list of partition name
"""
_check_type(str, topic, 'topic')
return self._client.get_topic_partitions(topic)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.shutdown">
<p>def <span class="ident">shutdown</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Perform immediate shutdown of Pulsar client.</p>
<p>Release all resources and close all producer, consumer, and readers without waiting
for ongoing operations to complete.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.shutdown', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.shutdown" class="source">
<pre><code>def shutdown(self):
"""
Perform immediate shutdown of Pulsar client.
Release all resources and close all producer, consumer, and readers without waiting
for ongoing operations to complete.
"""
self._client.shutdown()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Client.subscribe">
<p>def <span class="ident">subscribe</span>(</p><p>self, topic, subscription_name, consumer_type=_pulsar.ConsumerType.Exclusive, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7fbbce1c1290&gt;, message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, initial_position=_pulsar.InitialPosition.Latest, crypto_key_reader=None, replicate_subscription_state_enabled=False)</p>
</div>
<div class="desc"><p>Subscribe to the given topic and subscription combination.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>topic</code>: The name of the topic, list of topics or regex pattern.
This method will accept these forms:
- <code>topic='my-topic'</code>
- <code>topic=['topic-1', 'topic-2', 'topic-3']</code>
- <code>topic=re.compile('persistent://public/default/topic-*')</code></li>
<li><code>subscription</code>: The name of the subscription.</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>consumer_type</code>:
Select the subscription type to be used when subscribing to the topic.</li>
<li><code>schema</code>:
Define the schema of the data that will be received by this consumer.</li>
<li>
<p><code>message_listener</code>:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
<code>consumer.receive()</code> will not be allowed. The listener function needs
to accept (consumer, message), for example:</p>
<pre><code>#!python
def my_listener(consumer, message):
# process message
consumer.acknowledge(message)
</code></pre>
</li>
<li>
<p><code>receiver_queue_size</code>:
Sets the size of the consumer receive queue. The consumer receive
queue controls how many messages can be accumulated by the consumer
before the application calls <code>receive()</code>. Using a higher value could
potentially increase the consumer throughput at the expense of higher
memory utilization. Setting the consumer queue size to zero decreases
the throughput of the consumer by disabling pre-fetching of messages.
This approach improves the message distribution on shared subscription
by pushing messages only to those consumers that are ready to process
them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The <code>receive()</code> function call
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.</p>
</li>
<li><code>max_total_receiver_queue_size_across_partitions</code>
Set the max total receiver queue size across partitions.
This setting will be used to reduce the receiver queue size for individual partitions</li>
<li><code>consumer_name</code>:
Sets the consumer name.</li>
<li><code>unacked_messages_timeout_ms</code>:
Sets the timeout in milliseconds for unacknowledged messages. The
timeout needs to be greater than 10 seconds. An exception is thrown if
the given value is less than 10 seconds. If a successful
acknowledgement is not sent within the timeout, all the unacknowledged
messages are redelivered.</li>
<li><code>negative_ack_redelivery_delay_ms</code>:
The delay after which to redeliver the messages that failed to be
processed (with the <code>consumer.negative_acknowledge()</code>)</li>
<li><code>broker_consumer_stats_cache_time_ms</code>:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.</li>
<li><code>is_read_compacted</code>:
Selects whether to read the compacted version of the topic</li>
<li><code>properties</code>:
Sets the properties for the consumer. The properties associated with a consumer
can be used for identify a consumer at broker side.</li>
<li><code>pattern_auto_discovery_period</code>:
Periods of seconds for consumer to auto discover match topics.</li>
<li><code>initial_position</code>:
Set the initial position of a consumer when subscribing to the topic.
It could be either: <code>InitialPosition.Earliest</code> or <code>InitialPosition.Latest</code>.
Default: <code>Latest</code>.</li>
<li>crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer</li>
<li>replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default: <code>False</code>.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.subscribe', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Client.subscribe" class="source">
<pre><code>def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
negative_ack_redelivery_delay_ms=60000,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest,
crypto_key_reader=None,
replicate_subscription_state_enabled=False
):
"""
Subscribe to the given topic and subscription combination.
**Args**
* `topic`: The name of the topic, list of topics or regex pattern.
This method will accept these forms:
- `topic='my-topic'`
- `topic=['topic-1', 'topic-2', 'topic-3']`
- `topic=re.compile('persistent://public/default/topic-*')`
* `subscription`: The name of the subscription.
**Options**
* `consumer_type`:
Select the subscription type to be used when subscribing to the topic.
* `schema`:
Define the schema of the data that will be received by this consumer.
* `message_listener`:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
`consumer.receive()` will not be allowed. The listener function needs
to accept (consumer, message), for example:
#!python
def my_listener(consumer, message):
# process message
consumer.acknowledge(message)
* `receiver_queue_size`:
Sets the size of the consumer receive queue. The consumer receive
queue controls how many messages can be accumulated by the consumer
before the application calls `receive()`. Using a higher value could
potentially increase the consumer throughput at the expense of higher
memory utilization. Setting the consumer queue size to zero decreases
the throughput of the consumer by disabling pre-fetching of messages.
This approach improves the message distribution on shared subscription
by pushing messages only to those consumers that are ready to process
them. Neither receive with timeout nor partitioned topics can be used
if the consumer queue size is zero. The `receive()` function call
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.
* `max_total_receiver_queue_size_across_partitions`
Set the max total receiver queue size across partitions.
This setting will be used to reduce the receiver queue size for individual partitions
* `consumer_name`:
Sets the consumer name.
* `unacked_messages_timeout_ms`:
Sets the timeout in milliseconds for unacknowledged messages. The
timeout needs to be greater than 10 seconds. An exception is thrown if
the given value is less than 10 seconds. If a successful
acknowledgement is not sent within the timeout, all the unacknowledged
messages are redelivered.
* `negative_ack_redelivery_delay_ms`:
The delay after which to redeliver the messages that failed to be
processed (with the `consumer.negative_acknowledge()`)
* `broker_consumer_stats_cache_time_ms`:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.
* `is_read_compacted`:
Selects whether to read the compacted version of the topic
* `properties`:
Sets the properties for the consumer. The properties associated with a consumer
can be used for identify a consumer at broker side.
* `pattern_auto_discovery_period`:
Periods of seconds for consumer to auto discover match topics.
* `initial_position`:
Set the initial position of a consumer when subscribing to the topic.
It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
Default: `Latest`.
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
* replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default: `False`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
_check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
_check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.subscription_initial_position(initial_position)
conf.schema(schema.schema_info())
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
c = Consumer()
if isinstance(topic, str):
# Single topic
c._consumer = self._client.subscribe(topic, subscription_name, conf)
elif isinstance(topic, list):
# List of topics
c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
elif isinstance(topic, _retype):
# Regex pattern
c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
else:
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
c._client = self
c._schema = schema
self._consumers.append(c)
return c
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.Consumer" class="name">class <span class="ident">Consumer</span></p>
<div class="desc"><p>Pulsar consumer.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer" class="source">
<pre><code>class Consumer:
"""
Pulsar consumer.
"""
def topic(self):
"""
Return the topic this consumer is subscribed to.
"""
return self._consumer.topic()
def subscription_name(self):
"""
Return the subscription name.
"""
return self._consumer.subscription_name()
def unsubscribe(self):
"""
Unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the
consumer is unsubscribed, no more messages will be received and
subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
"""
return self._consumer.unsubscribe()
def receive(self, timeout_millis=None):
"""
Receive a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._consumer.receive()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._consumer.receive(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
def acknowledge(self, message):
"""
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker.
After that, the message will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
else:
self._consumer.acknowledge(message)
def acknowledge_cumulative(self, message):
"""
Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
This method will block until an acknowledgement is sent to the broker.
After that, the messages will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
else:
self._consumer.acknowledge_cumulative(message)
def negative_acknowledge(self, message):
"""
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.negative_acknowledge(message._message)
else:
self._consumer.negative_acknowledge(message)
def pause_message_listener(self):
"""
Pause receiving messages via the `message_listener` until
`resume_message_listener()` is called.
"""
self._consumer.pause_message_listener()
def resume_message_listener(self):
"""
Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
`pause_message_listener()` was called.
"""
self._consumer.resume_message_listener()
def redeliver_unacknowledged_messages(self):
"""
Redelivers all the unacknowledged messages. In failover mode, the
request is ignored if the consumer is not active for the given topic. In
shared mode, the consumer's messages to be redelivered are distributed
across all the connected consumers. This is a non-blocking call and
doesn't throw an exception. In case the connection breaks, the messages
are redelivered after reconnect.
"""
self._consumer.redeliver_unacknowledged_messages()
def seek(self, messageid):
"""
Reset the subscription associated with this consumer to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._consumer.seek(messageid)
def close(self):
"""
Close the consumer.
"""
self._consumer.close()
self._client._consumers.remove(self)
def is_connected(self):
"""
Check if the consumer is connected or not.
"""
return self._consumer.is_connected()
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Consumer">Consumer</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Consumer.acknowledge">
<p>def <span class="ident">acknowledge</span>(</p><p>self, message)</p>
</div>
<div class="desc"><p>Acknowledge the reception of a single message.</p>
<p>This method will block until an acknowledgement is sent to the broker.
After that, the message will not be re-delivered to this consumer.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>message</code>:
The received message or message id.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.acknowledge" class="source">
<pre><code>def acknowledge(self, message):
"""
Acknowledge the reception of a single message.
This method will block until an acknowledgement is sent to the broker.
After that, the message will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
else:
self._consumer.acknowledge(message)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.acknowledge_cumulative">
<p>def <span class="ident">acknowledge_cumulative</span>(</p><p>self, message)</p>
</div>
<div class="desc"><p>Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.</p>
<p>This method will block until an acknowledgement is sent to the broker.
After that, the messages will not be re-delivered to this consumer.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>message</code>:
The received message or message id.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge_cumulative', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.acknowledge_cumulative" class="source">
<pre><code>def acknowledge_cumulative(self, message):
"""
Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
This method will block until an acknowledgement is sent to the broker.
After that, the messages will not be re-delivered to this consumer.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
else:
self._consumer.acknowledge_cumulative(message)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.close">
<p>def <span class="ident">close</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Close the consumer.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.close', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.close" class="source">
<pre><code>def close(self):
"""
Close the consumer.
"""
self._consumer.close()
self._client._consumers.remove(self)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.is_connected">
<p>def <span class="ident">is_connected</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Check if the consumer is connected or not.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.is_connected', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.is_connected" class="source">
<pre><code>def is_connected(self):
"""
Check if the consumer is connected or not.
"""
return self._consumer.is_connected()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.negative_acknowledge">
<p>def <span class="ident">negative_acknowledge</span>(</p><p>self, message)</p>
</div>
<div class="desc"><p>Acknowledge the failure to process a single message.</p>
<p>When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.</p>
<p>This call is not blocking.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>message</code>:
The received message or message id.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.negative_acknowledge', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.negative_acknowledge" class="source">
<pre><code>def negative_acknowledge(self, message):
"""
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
**Args**
* `message`:
The received message or message id.
"""
if isinstance(message, Message):
self._consumer.negative_acknowledge(message._message)
else:
self._consumer.negative_acknowledge(message)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.pause_message_listener">
<p>def <span class="ident">pause_message_listener</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Pause receiving messages via the <code>message_listener</code> until
<code>resume_message_listener()</code> is called.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.pause_message_listener', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.pause_message_listener" class="source">
<pre><code>def pause_message_listener(self):
"""
Pause receiving messages via the `message_listener` until
`resume_message_listener()` is called.
"""
self._consumer.pause_message_listener()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.receive">
<p>def <span class="ident">receive</span>(</p><p>self, timeout_millis=None)</p>
</div>
<div class="desc"><p>Receive a single message.</p>
<p>If a message is not immediately available, this method will block until
a new message is available.</p>
<p><strong>Options</strong></p>
<ul>
<li><code>timeout_millis</code>:
If specified, the receive will raise an exception if a message is not
available within the timeout.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.receive', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.receive" class="source">
<pre><code>def receive(self, timeout_millis=None):
"""
Receive a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._consumer.receive()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._consumer.receive(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.redeliver_unacknowledged_messages">
<p>def <span class="ident">redeliver_unacknowledged_messages</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Redelivers all the unacknowledged messages. In failover mode, the
request is ignored if the consumer is not active for the given topic. In
shared mode, the consumer's messages to be redelivered are distributed
across all the connected consumers. This is a non-blocking call and
doesn't throw an exception. In case the connection breaks, the messages
are redelivered after reconnect.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.redeliver_unacknowledged_messages', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.redeliver_unacknowledged_messages" class="source">
<pre><code>def redeliver_unacknowledged_messages(self):
"""
Redelivers all the unacknowledged messages. In failover mode, the
request is ignored if the consumer is not active for the given topic. In
shared mode, the consumer's messages to be redelivered are distributed
across all the connected consumers. This is a non-blocking call and
doesn't throw an exception. In case the connection breaks, the messages
are redelivered after reconnect.
"""
self._consumer.redeliver_unacknowledged_messages()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.resume_message_listener">
<p>def <span class="ident">resume_message_listener</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
<code>pause_message_listener()</code> was called.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.resume_message_listener', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.resume_message_listener" class="source">
<pre><code>def resume_message_listener(self):
"""
Resume receiving the messages via the message listener.
Asynchronously receive all the messages enqueued from the time
`pause_message_listener()` was called.
"""
self._consumer.resume_message_listener()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.seek">
<p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
</div>
<div class="desc"><p>Reset the subscription associated with this consumer to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>message</code>:
The message id for seek, OR an integer event time to seek to</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.seek', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.seek" class="source">
<pre><code>def seek(self, messageid):
"""
Reset the subscription associated with this consumer to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._consumer.seek(messageid)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.subscription_name">
<p>def <span class="ident">subscription_name</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the subscription name.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.subscription_name', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.subscription_name" class="source">
<pre><code>def subscription_name(self):
"""
Return the subscription name.
"""
return self._consumer.subscription_name()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.topic">
<p>def <span class="ident">topic</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the topic this consumer is subscribed to.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.topic', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.topic" class="source">
<pre><code>def topic(self):
"""
Return the topic this consumer is subscribed to.
"""
return self._consumer.topic()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Consumer.unsubscribe">
<p>def <span class="ident">unsubscribe</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Unsubscribe the current consumer from the topic.</p>
<p>This method will block until the operation is completed. Once the
consumer is unsubscribed, no more messages will be received and
subsequent new messages will not be retained for this consumer.</p>
<p>This consumer object cannot be reused.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.unsubscribe', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Consumer.unsubscribe" class="source">
<pre><code>def unsubscribe(self):
"""
Unsubscribe the current consumer from the topic.
This method will block until the operation is completed. Once the
consumer is unsubscribed, no more messages will be received and
subsequent new messages will not be retained for this consumer.
This consumer object cannot be reused.
"""
return self._consumer.unsubscribe()
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.CryptoKeyReader" class="name">class <span class="ident">CryptoKeyReader</span></p>
<div class="desc"><p>Default crypto key reader implementation</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader', this);">Show source &equiv;</a></p>
<div id="source-pulsar.CryptoKeyReader" class="source">
<pre><code>class CryptoKeyReader:
"""
Default crypto key reader implementation
"""
def __init__(self, public_key_path, private_key_path):
"""
Create crypto key reader.
**Args**
* `public_key_path`: Path to the public key
* `private_key_path`: Path to private key
"""
_check_type(str, public_key_path, 'public_key_path')
_check_type(str, private_key_path, 'private_key_path')
self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></li>
</ul>
<h3>Instance variables</h3>
<div class="item">
<p id="pulsar.CryptoKeyReader.cryptoKeyReader" class="name">var <span class="ident">cryptoKeyReader</span></p>
<div class="source_cont">
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.CryptoKeyReader.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, public_key_path, private_key_path)</p>
</div>
<div class="desc"><p>Create crypto key reader.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>public_key_path</code>: Path to the public key</li>
<li><code>private_key_path</code>: Path to private key</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.CryptoKeyReader.__init__" class="source">
<pre><code>def __init__(self, public_key_path, private_key_path):
"""
Create crypto key reader.
**Args**
* `public_key_path`: Path to the public key
* `private_key_path`: Path to private key
"""
_check_type(str, public_key_path, 'public_key_path')
_check_type(str, private_key_path, 'private_key_path')
self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.Message" class="name">class <span class="ident">Message</span></p>
<div class="desc"><p>Message objects are returned by a consumer, either by calling <code>receive</code> or
through a listener.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message" class="source">
<pre><code>class Message:
"""
Message objects are returned by a consumer, either by calling `receive` or
through a listener.
"""
def data(self):
"""
Returns object typed bytes with the payload of the message.
"""
return self._message.data()
def value(self):
"""
Returns object with the de-serialized version of the message content
"""
return self._schema.decode(self._message.data())
def properties(self):
"""
Return the properties attached to the message. Properties are
application-defined key/value pairs that will be attached to the
message.
"""
return self._message.properties()
def partition_key(self):
"""
Get the partitioning key for the message.
"""
return self._message.partition_key()
def publish_timestamp(self):
"""
Get the timestamp in milliseconds with the message publish time.
"""
return self._message.publish_timestamp()
def event_timestamp(self):
"""
Get the timestamp in milliseconds with the message event time.
"""
return self._message.event_timestamp()
def message_id(self):
"""
The message ID that can be used to refere to this particular message.
"""
return self._message.message_id()
def topic_name(self):
"""
Get the topic Name from which this message originated from
"""
return self._message.topic_name()
def redelivery_count(self):
"""
Get the redelivery count for this message
"""
return self._message.redelivery_count()
def schema_version(self):
"""
Get the schema version for this message
"""
return self._message.schema_version()
@staticmethod
def _wrap(_message):
self = Message()
self._message = _message
return self
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Message">Message</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Message.data">
<p>def <span class="ident">data</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Returns object typed bytes with the payload of the message.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.data', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.data" class="source">
<pre><code>def data(self):
"""
Returns object typed bytes with the payload of the message.
"""
return self._message.data()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.event_timestamp">
<p>def <span class="ident">event_timestamp</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the timestamp in milliseconds with the message event time.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.event_timestamp', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.event_timestamp" class="source">
<pre><code>def event_timestamp(self):
"""
Get the timestamp in milliseconds with the message event time.
"""
return self._message.event_timestamp()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.message_id">
<p>def <span class="ident">message_id</span>(</p><p>self)</p>
</div>
<div class="desc"><p>The message ID that can be used to refere to this particular message.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.message_id', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.message_id" class="source">
<pre><code>def message_id(self):
"""
The message ID that can be used to refere to this particular message.
"""
return self._message.message_id()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.partition_key">
<p>def <span class="ident">partition_key</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the partitioning key for the message.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.partition_key', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.partition_key" class="source">
<pre><code>def partition_key(self):
"""
Get the partitioning key for the message.
"""
return self._message.partition_key()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.properties">
<p>def <span class="ident">properties</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the properties attached to the message. Properties are
application-defined key/value pairs that will be attached to the
message.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.properties', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.properties" class="source">
<pre><code>def properties(self):
"""
Return the properties attached to the message. Properties are
application-defined key/value pairs that will be attached to the
message.
"""
return self._message.properties()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.publish_timestamp">
<p>def <span class="ident">publish_timestamp</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the timestamp in milliseconds with the message publish time.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.publish_timestamp', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.publish_timestamp" class="source">
<pre><code>def publish_timestamp(self):
"""
Get the timestamp in milliseconds with the message publish time.
"""
return self._message.publish_timestamp()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.redelivery_count">
<p>def <span class="ident">redelivery_count</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the redelivery count for this message</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.redelivery_count', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.redelivery_count" class="source">
<pre><code>def redelivery_count(self):
"""
Get the redelivery count for this message
"""
return self._message.redelivery_count()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.schema_version">
<p>def <span class="ident">schema_version</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the schema version for this message</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.schema_version', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.schema_version" class="source">
<pre><code>def schema_version(self):
"""
Get the schema version for this message
"""
return self._message.schema_version()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.topic_name">
<p>def <span class="ident">topic_name</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the topic Name from which this message originated from</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.topic_name', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.topic_name" class="source">
<pre><code>def topic_name(self):
"""
Get the topic Name from which this message originated from
"""
return self._message.topic_name()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Message.value">
<p>def <span class="ident">value</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Returns object with the de-serialized version of the message content</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.value', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Message.value" class="source">
<pre><code>def value(self):
"""
Returns object with the de-serialized version of the message content
"""
return self._schema.decode(self._message.data())
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.MessageBatch" class="name">class <span class="ident">MessageBatch</span></p>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageBatch" class="source">
<pre><code>class MessageBatch:
def __init__(self):
self._msg_batch = _pulsar.MessageBatch()
def with_message_id(self, msg_id):
if not isinstance(msg_id, _pulsar.MessageId):
if isinstance(msg_id, MessageId):
msg_id = msg_id._msg_id
else:
raise TypeError("unknown message id type")
self._msg_batch.with_message_id(msg_id)
return self
def parse_from(self, data, size):
self._msg_batch.parse_from(data, size)
_msgs = self._msg_batch.messages()
return list(map(Message._wrap, _msgs))
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.MessageBatch">MessageBatch</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.MessageBatch.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageBatch.__init__" class="source">
<pre><code>def __init__(self):
self._msg_batch = _pulsar.MessageBatch()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageBatch.parse_from">
<p>def <span class="ident">parse_from</span>(</p><p>self, data, size)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.parse_from', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageBatch.parse_from" class="source">
<pre><code>def parse_from(self, data, size):
self._msg_batch.parse_from(data, size)
_msgs = self._msg_batch.messages()
return list(map(Message._wrap, _msgs))
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageBatch.with_message_id">
<p>def <span class="ident">with_message_id</span>(</p><p>self, msg_id)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.with_message_id', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageBatch.with_message_id" class="source">
<pre><code>def with_message_id(self, msg_id):
if not isinstance(msg_id, _pulsar.MessageId):
if isinstance(msg_id, MessageId):
msg_id = msg_id._msg_id
else:
raise TypeError("unknown message id type")
self._msg_batch.with_message_id(msg_id)
return self
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.MessageId" class="name">class <span class="ident">MessageId</span></p>
<div class="desc"><p>Represents a message id</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId" class="source">
<pre><code>class MessageId:
"""
Represents a message id
"""
def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
'Represents the earliest message stored in a topic'
earliest = _pulsar.MessageId.earliest
'Represents the latest message published on a topic'
latest = _pulsar.MessageId.latest
def ledger_id(self):
return self._msg_id.ledger_id()
def entry_id(self):
return self._msg_id.entry_id()
def batch_index(self):
return self._msg_id.batch_index()
def partition(self):
return self._msg_id.partition()
def serialize(self):
"""
Returns a bytes representation of the message id.
This bytes sequence can be stored and later deserialized.
"""
return self._msg_id.serialize()
@staticmethod
def deserialize(message_id_bytes):
"""
Deserialize a message id object from a previously
serialized bytes sequence.
"""
return _pulsar.MessageId.deserialize(message_id_bytes)
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.MessageId">MessageId</a></li>
</ul>
<h3>Class variables</h3>
<div class="item">
<p id="pulsar.MessageId.earliest" class="name">var <span class="ident">earliest</span></p>
<div class="desc"><p>Represents the latest message published on a topic</p></div>
<div class="source_cont">
</div>
</div>
<div class="item">
<p id="pulsar.MessageId.latest" class="name">var <span class="ident">latest</span></p>
<div class="source_cont">
</div>
</div>
<h3>Static methods</h3>
<div class="item">
<div class="name def" id="pulsar.MessageId.deserialize">
<p>def <span class="ident">deserialize</span>(</p><p>message_id_bytes)</p>
</div>
<div class="desc"><p>Deserialize a message id object from a previously
serialized bytes sequence.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.deserialize', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.deserialize" class="source">
<pre><code>@staticmethod
def deserialize(message_id_bytes):
"""
Deserialize a message id object from a previously
serialized bytes sequence.
"""
return _pulsar.MessageId.deserialize(message_id_bytes)
</code></pre>
</div>
</div>
</div>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.MessageId.__init__">
<p>def <span class="ident">__init__</span>(</p><p>self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.__init__', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.__init__" class="source">
<pre><code>def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageId.batch_index">
<p>def <span class="ident">batch_index</span>(</p><p>self)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.batch_index', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.batch_index" class="source">
<pre><code>def batch_index(self):
return self._msg_id.batch_index()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageId.entry_id">
<p>def <span class="ident">entry_id</span>(</p><p>self)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.entry_id', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.entry_id" class="source">
<pre><code>def entry_id(self):
return self._msg_id.entry_id()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageId.ledger_id">
<p>def <span class="ident">ledger_id</span>(</p><p>self)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.ledger_id', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.ledger_id" class="source">
<pre><code>def ledger_id(self):
return self._msg_id.ledger_id()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageId.partition">
<p>def <span class="ident">partition</span>(</p><p>self)</p>
</div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.partition', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.partition" class="source">
<pre><code>def partition(self):
return self._msg_id.partition()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.MessageId.serialize">
<p>def <span class="ident">serialize</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Returns a bytes representation of the message id.
This bytes sequence can be stored and later deserialized.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.serialize', this);">Show source &equiv;</a></p>
<div id="source-pulsar.MessageId.serialize" class="source">
<pre><code>def serialize(self):
"""
Returns a bytes representation of the message id.
This bytes sequence can be stored and later deserialized.
"""
return self._msg_id.serialize()
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.Producer" class="name">class <span class="ident">Producer</span></p>
<div class="desc"><p>The Pulsar message producer, used to publish messages on a topic.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer" class="source">
<pre><code>class Producer:
"""
The Pulsar message producer, used to publish messages on a topic.
"""
def topic(self):
"""
Return the topic which producer is publishing to
"""
return self._producer.topic()
def producer_name(self):
"""
Return the producer name which could have been assigned by the
system or specified by the client
"""
return self._producer.producer_name()
def last_sequence_id(self):
"""
Get the last sequence id that was published by this producer.
This represent either the automatically assigned or custom sequence id
(set on the `MessageBuilder`) that was published and acknowledged by the broker.
After recreating a producer with the same producer name, this will return the
last message that was published in the previous producer session, or -1 if
there no message was ever published.
"""
return self._producer.last_sequence_id()
def send(self, content,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Publish a message on the topic. Blocks until the message is acknowledged
Returns a `MessageId` object that represents where the message is persisted.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application-defined string properties.
* `partition_key`:
Sets the partition key for message routing. A hash of this key is used
to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`:
Override namespace replication clusters. Note that it is the caller's
responsibility to provide valid cluster names and that all clusters
have been previously configured as topics. Given an empty list,
the message will replicate according to the namespace configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
return MessageId.deserialize(self._producer.send(msg))
def send_async(self, content, callback,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Send a message asynchronously.
The `callback` will be invoked once the message has been acknowledged
by the broker.
Example:
#!python
def callback(res, msg_id):
print('Message published: %s' % res)
producer.send_async(msg, callback)
When the producer queue is full, by default the message will be rejected
and the callback invoked with an error code.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application0-defined string properties.
* `partition_key`:
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`: Override namespace replication clusters. Note
that it is the caller's responsibility to provide valid cluster names
and that all clusters have been previously configured as topics.
Given an empty list, the message will replicate per the namespace
configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)
def flush(self):
"""
Flush all the messages buffered in the client and wait until all messages have been
successfully persisted
"""
self._producer.flush()
def close(self):
"""
Close the producer.
"""
self._producer.close()
def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)
_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
_check_type_or_none(list, replication_clusters, 'replication_clusters')
_check_type(bool, disable_replication, 'disable_replication')
_check_type_or_none(int, event_timestamp, 'event_timestamp')
_check_type_or_none(int, deliver_at, 'deliver_at')
_check_type_or_none(timedelta, deliver_after, 'deliver_after')
mb = _pulsar.MessageBuilder()
mb.content(data)
if properties:
for k, v in properties.items():
mb.property(k, v)
if partition_key:
mb.partition_key(partition_key)
if sequence_id:
mb.sequence_id(sequence_id)
if replication_clusters:
mb.replication_clusters(replication_clusters)
if disable_replication:
mb.disable_replication(disable_replication)
if event_timestamp:
mb.event_timestamp(event_timestamp)
if deliver_at:
mb.deliver_at(deliver_at)
if deliver_after:
mb.deliver_after(deliver_after)
return mb.build()
def is_connected(self):
"""
Check if the producer is connected or not.
"""
return self._producer.is_connected()
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Producer">Producer</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Producer.close">
<p>def <span class="ident">close</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Close the producer.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.close', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.close" class="source">
<pre><code>def close(self):
"""
Close the producer.
"""
self._producer.close()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.flush">
<p>def <span class="ident">flush</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Flush all the messages buffered in the client and wait until all messages have been
successfully persisted</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.flush', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.flush" class="source">
<pre><code>def flush(self):
"""
Flush all the messages buffered in the client and wait until all messages have been
successfully persisted
"""
self._producer.flush()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.is_connected">
<p>def <span class="ident">is_connected</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Check if the producer is connected or not.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.is_connected', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.is_connected" class="source">
<pre><code>def is_connected(self):
"""
Check if the producer is connected or not.
"""
return self._producer.is_connected()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.last_sequence_id">
<p>def <span class="ident">last_sequence_id</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Get the last sequence id that was published by this producer.</p>
<p>This represent either the automatically assigned or custom sequence id
(set on the <code>MessageBuilder</code>) that was published and acknowledged by the broker.</p>
<p>After recreating a producer with the same producer name, this will return the
last message that was published in the previous producer session, or -1 if
there no message was ever published.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.last_sequence_id', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.last_sequence_id" class="source">
<pre><code>def last_sequence_id(self):
"""
Get the last sequence id that was published by this producer.
This represent either the automatically assigned or custom sequence id
(set on the `MessageBuilder`) that was published and acknowledged by the broker.
After recreating a producer with the same producer name, this will return the
last message that was published in the previous producer session, or -1 if
there no message was ever published.
"""
return self._producer.last_sequence_id()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.producer_name">
<p>def <span class="ident">producer_name</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the producer name which could have been assigned by the
system or specified by the client</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.producer_name', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.producer_name" class="source">
<pre><code>def producer_name(self):
"""
Return the producer name which could have been assigned by the
system or specified by the client
"""
return self._producer.producer_name()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.send">
<p>def <span class="ident">send</span>(</p><p>self, content, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
</div>
<div class="desc"><p>Publish a message on the topic. Blocks until the message is acknowledged</p>
<p>Returns a <code>MessageId</code> object that represents where the message is persisted.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>content</code>:
A <code>bytes</code> object with the message payload.</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>properties</code>:
A dict of application-defined string properties.</li>
<li><code>partition_key</code>:
Sets the partition key for message routing. A hash of this key is used
to determine the message's topic partition.</li>
<li><code>sequence_id</code>:
Specify a custom sequence id for the message being published.</li>
<li><code>replication_clusters</code>:
Override namespace replication clusters. Note that it is the caller's
responsibility to provide valid cluster names and that all clusters
have been previously configured as topics. Given an empty list,
the message will replicate according to the namespace configuration.</li>
<li><code>disable_replication</code>:
Do not replicate this message.</li>
<li><code>event_timestamp</code>:
Timestamp in millis of the timestamp of event creation</li>
<li><code>deliver_at</code>:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC</li>
<li><code>deliver_after</code>:
Specify a delay in timedelta for the delivery of the messages.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.send" class="source">
<pre><code>def send(self, content,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Publish a message on the topic. Blocks until the message is acknowledged
Returns a `MessageId` object that represents where the message is persisted.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application-defined string properties.
* `partition_key`:
Sets the partition key for message routing. A hash of this key is used
to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`:
Override namespace replication clusters. Note that it is the caller's
responsibility to provide valid cluster names and that all clusters
have been previously configured as topics. Given an empty list,
the message will replicate according to the namespace configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
return MessageId.deserialize(self._producer.send(msg))
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.send_async">
<p>def <span class="ident">send_async</span>(</p><p>self, content, callback, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
</div>
<div class="desc"><p>Send a message asynchronously.</p>
<p>The <code>callback</code> will be invoked once the message has been acknowledged
by the broker.</p>
<p>Example:</p>
<pre><code>#!python
def callback(res, msg_id):
print('Message published: %s' % res)
producer.send_async(msg, callback)
</code></pre>
<p>When the producer queue is full, by default the message will be rejected
and the callback invoked with an error code.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>content</code>:
A <code>bytes</code> object with the message payload.</li>
</ul>
<p><strong>Options</strong></p>
<ul>
<li><code>properties</code>:
A dict of application0-defined string properties.</li>
<li><code>partition_key</code>:
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.</li>
<li><code>sequence_id</code>:
Specify a custom sequence id for the message being published.</li>
<li><code>replication_clusters</code>: Override namespace replication clusters. Note
that it is the caller's responsibility to provide valid cluster names
and that all clusters have been previously configured as topics.
Given an empty list, the message will replicate per the namespace
configuration.</li>
<li><code>disable_replication</code>:
Do not replicate this message.</li>
<li><code>event_timestamp</code>:
Timestamp in millis of the timestamp of event creation</li>
<li><code>deliver_at</code>:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC</li>
<li><code>deliver_after</code>:
Specify a delay in timedelta for the delivery of the messages.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send_async', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.send_async" class="source">
<pre><code>def send_async(self, content, callback,
properties=None,
partition_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
"""
Send a message asynchronously.
The `callback` will be invoked once the message has been acknowledged
by the broker.
Example:
#!python
def callback(res, msg_id):
print('Message published: %s' % res)
producer.send_async(msg, callback)
When the producer queue is full, by default the message will be rejected
and the callback invoked with an error code.
**Args**
* `content`:
A `bytes` object with the message payload.
**Options**
* `properties`:
A dict of application0-defined string properties.
* `partition_key`:
Sets the partition key for the message routing. A hash of this key is
used to determine the message's topic partition.
* `sequence_id`:
Specify a custom sequence id for the message being published.
* `replication_clusters`: Override namespace replication clusters. Note
that it is the caller's responsibility to provide valid cluster names
and that all clusters have been previously configured as topics.
Given an empty list, the message will replicate per the namespace
configuration.
* `disable_replication`:
Do not replicate this message.
* `event_timestamp`:
Timestamp in millis of the timestamp of event creation
* `deliver_at`:
Specify the this message should not be delivered earlier than the
specified timestamp.
The timestamp is milliseconds and based on UTC
* `deliver_after`:
Specify a delay in timedelta for the delivery of the messages.
"""
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Producer.topic">
<p>def <span class="ident">topic</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the topic which producer is publishing to</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.topic', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Producer.topic" class="source">
<pre><code>def topic(self):
"""
Return the topic which producer is publishing to
"""
return self._producer.topic()
</code></pre>
</div>
</div>
</div>
</div>
</div>
<div class="item">
<p id="pulsar.Reader" class="name">class <span class="ident">Reader</span></p>
<div class="desc"><p>Pulsar topic reader.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader" class="source">
<pre><code>class Reader:
"""
Pulsar topic reader.
"""
def topic(self):
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()
def read_next(self, timeout_millis=None):
"""
Read a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._reader.read_next()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._reader.read_next(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
def has_message_available(self):
"""
Check if there is any message available to read from the current position.
"""
return self._reader.has_message_available();
def seek(self, messageid):
"""
Reset this reader to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._reader.seek(messageid)
def close(self):
"""
Close the reader.
"""
self._reader.close()
self._client._consumers.remove(self)
def is_connected(self):
"""
Check if the reader is connected or not.
"""
return self._reader.is_connected()
</code></pre>
</div>
</div>
<div class="class">
<h3>Ancestors (in MRO)</h3>
<ul class="class_list">
<li><a href="#pulsar.Reader">Reader</a></li>
</ul>
<h3>Methods</h3>
<div class="item">
<div class="name def" id="pulsar.Reader.close">
<p>def <span class="ident">close</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Close the reader.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.close', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.close" class="source">
<pre><code>def close(self):
"""
Close the reader.
"""
self._reader.close()
self._client._consumers.remove(self)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Reader.has_message_available">
<p>def <span class="ident">has_message_available</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Check if there is any message available to read from the current position.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.has_message_available', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.has_message_available" class="source">
<pre><code>def has_message_available(self):
"""
Check if there is any message available to read from the current position.
"""
return self._reader.has_message_available();
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Reader.is_connected">
<p>def <span class="ident">is_connected</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Check if the reader is connected or not.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.is_connected', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.is_connected" class="source">
<pre><code>def is_connected(self):
"""
Check if the reader is connected or not.
"""
return self._reader.is_connected()
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Reader.read_next">
<p>def <span class="ident">read_next</span>(</p><p>self, timeout_millis=None)</p>
</div>
<div class="desc"><p>Read a single message.</p>
<p>If a message is not immediately available, this method will block until
a new message is available.</p>
<p><strong>Options</strong></p>
<ul>
<li><code>timeout_millis</code>:
If specified, the receive will raise an exception if a message is not
available within the timeout.</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.read_next', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.read_next" class="source">
<pre><code>def read_next(self, timeout_millis=None):
"""
Read a single message.
If a message is not immediately available, this method will block until
a new message is available.
**Options**
* `timeout_millis`:
If specified, the receive will raise an exception if a message is not
available within the timeout.
"""
if timeout_millis is None:
msg = self._reader.read_next()
else:
_check_type(int, timeout_millis, 'timeout_millis')
msg = self._reader.read_next(timeout_millis)
m = Message()
m._message = msg
m._schema = self._schema
return m
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Reader.seek">
<p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
</div>
<div class="desc"><p>Reset this reader to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.</p>
<p><strong>Args</strong></p>
<ul>
<li><code>message</code>:
The message id for seek, OR an integer event time to seek to</li>
</ul></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.seek', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.seek" class="source">
<pre><code>def seek(self, messageid):
"""
Reset this reader to a specific message id or publish timestamp.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek, OR an integer event time to seek to
"""
self._reader.seek(messageid)
</code></pre>
</div>
</div>
</div>
<div class="item">
<div class="name def" id="pulsar.Reader.topic">
<p>def <span class="ident">topic</span>(</p><p>self)</p>
</div>
<div class="desc"><p>Return the topic this reader is reading from.</p></div>
<div class="source_cont">
<p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.topic', this);">Show source &equiv;</a></p>
<div id="source-pulsar.Reader.topic" class="source">
<pre><code>def topic(self):
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()
</code></pre>
</div>
</div>
</div>
</div>
</div>
<h2 class="section-title" id="header-submodules">Sub-modules</h2>
<div class="item">
<p class="name"><a href="exceptions.m.html">pulsar.exceptions</a></p>
</div>
<div class="item">
<p class="name"><a href="functions/index.html">pulsar.functions</a></p>
</div>
<div class="item">
<p class="name"><a href="schema/index.html">pulsar.schema</a></p>
</div>
</section>
</article>
<div class="clear"> </div>
<footer id="footer">
<p>
Documentation generated by
<a href="https://github.com/BurntSushi/pdoc">pdoc 0.3.2</a>
</p>
<p>pdoc is in the public domain with the
<a href="http://unlicense.org">UNLICENSE</a></p>
<p>Design by <a href="http://nadh.in">Kailash Nadh</a></p>
</footer>
</div>
</body>
</html>