| <!doctype html> |
| <head> |
| <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" /> |
| |
| <title>pulsar API documentation</title> |
| <meta name="description" content="The Pulsar Python client library is based on the existing C++ client library. |
| All the same features ..." /> |
| |
| <link href='http://fonts.googleapis.com/css?family=Source+Sans+Pro:400,300' rel='stylesheet' type='text/css'> |
| |
| <style type="text/css"> |
| |
| * { |
| box-sizing: border-box; |
| } |
| /*! normalize.css v1.1.1 | MIT License | git.io/normalize */ |
| |
| /* ========================================================================== |
| HTML5 display definitions |
| ========================================================================== */ |
| |
| /** |
| * Correct `block` display not defined in IE 6/7/8/9 and Firefox 3. |
| */ |
| |
| article, |
| aside, |
| details, |
| figcaption, |
| figure, |
| footer, |
| header, |
| hgroup, |
| main, |
| nav, |
| section, |
| summary { |
| display: block; |
| } |
| |
| /** |
| * Correct `inline-block` display not defined in IE 6/7/8/9 and Firefox 3. |
| */ |
| |
| audio, |
| canvas, |
| video { |
| display: inline-block; |
| *display: inline; |
| *zoom: 1; |
| } |
| |
| /** |
| * Prevent modern browsers from displaying `audio` without controls. |
| * Remove excess height in iOS 5 devices. |
| */ |
| |
| audio:not([controls]) { |
| display: none; |
| height: 0; |
| } |
| |
| /** |
| * Address styling not present in IE 7/8/9, Firefox 3, and Safari 4. |
| * Known issue: no IE 6 support. |
| */ |
| |
| [hidden] { |
| display: none; |
| } |
| |
| /* ========================================================================== |
| Base |
| ========================================================================== */ |
| |
| /** |
| * 1. Prevent system color scheme's background color being used in Firefox, IE, |
| * and Opera. |
| * 2. Prevent system color scheme's text color being used in Firefox, IE, and |
| * Opera. |
| * 3. Correct text resizing oddly in IE 6/7 when body `font-size` is set using |
| * `em` units. |
| * 4. Prevent iOS text size adjust after orientation change, without disabling |
| * user zoom. |
| */ |
| |
| html { |
| background: #fff; /* 1 */ |
| color: #000; /* 2 */ |
| font-size: 100%; /* 3 */ |
| -webkit-text-size-adjust: 100%; /* 4 */ |
| -ms-text-size-adjust: 100%; /* 4 */ |
| } |
| |
| /** |
| * Address `font-family` inconsistency between `textarea` and other form |
| * elements. |
| */ |
| |
| html, |
| button, |
| input, |
| select, |
| textarea { |
| font-family: sans-serif; |
| } |
| |
| /** |
| * Address margins handled incorrectly in IE 6/7. |
| */ |
| |
| body { |
| margin: 0; |
| } |
| |
| /* ========================================================================== |
| Links |
| ========================================================================== */ |
| |
| /** |
| * Address `outline` inconsistency between Chrome and other browsers. |
| */ |
| |
| a:focus { |
| outline: thin dotted; |
| } |
| |
| /** |
| * Improve readability when focused and also mouse hovered in all browsers. |
| */ |
| |
| a:active, |
| a:hover { |
| outline: 0; |
| } |
| |
| /* ========================================================================== |
| Typography |
| ========================================================================== */ |
| |
| /** |
| * Address font sizes and margins set differently in IE 6/7. |
| * Address font sizes within `section` and `article` in Firefox 4+, Safari 5, |
| * and Chrome. |
| */ |
| |
| h1 { |
| font-size: 2em; |
| margin: 0.67em 0; |
| } |
| |
| h2 { |
| font-size: 1.5em; |
| margin: 0.83em 0; |
| } |
| |
| h3 { |
| font-size: 1.17em; |
| margin: 1em 0; |
| } |
| |
| h4 { |
| font-size: 1em; |
| margin: 1.33em 0; |
| } |
| |
| h5 { |
| font-size: 0.83em; |
| margin: 1.67em 0; |
| } |
| |
| h6 { |
| font-size: 0.67em; |
| margin: 2.33em 0; |
| } |
| |
| /** |
| * Address styling not present in IE 7/8/9, Safari 5, and Chrome. |
| */ |
| |
| abbr[title] { |
| border-bottom: 1px dotted; |
| } |
| |
| /** |
| * Address style set to `bolder` in Firefox 3+, Safari 4/5, and Chrome. |
| */ |
| |
| b, |
| strong { |
| font-weight: bold; |
| } |
| |
| blockquote { |
| margin: 1em 40px; |
| } |
| |
| /** |
| * Address styling not present in Safari 5 and Chrome. |
| */ |
| |
| dfn { |
| font-style: italic; |
| } |
| |
| /** |
| * Address differences between Firefox and other browsers. |
| * Known issue: no IE 6/7 normalization. |
| */ |
| |
| hr { |
| -moz-box-sizing: content-box; |
| box-sizing: content-box; |
| height: 0; |
| } |
| |
| /** |
| * Address styling not present in IE 6/7/8/9. |
| */ |
| |
| mark { |
| background: #ff0; |
| color: #000; |
| } |
| |
| /** |
| * Address margins set differently in IE 6/7. |
| */ |
| |
| p, |
| pre { |
| margin: 1em 0; |
| } |
| |
| /** |
| * Correct font family set oddly in IE 6, Safari 4/5, and Chrome. |
| */ |
| |
| code, |
| kbd, |
| pre, |
| samp { |
| font-family: monospace, serif; |
| _font-family: 'courier new', monospace; |
| font-size: 1em; |
| } |
| |
| /** |
| * Improve readability of pre-formatted text in all browsers. |
| */ |
| |
| pre { |
| white-space: pre; |
| white-space: pre-wrap; |
| word-wrap: break-word; |
| } |
| |
| /** |
| * Address CSS quotes not supported in IE 6/7. |
| */ |
| |
| q { |
| quotes: none; |
| } |
| |
| /** |
| * Address `quotes` property not supported in Safari 4. |
| */ |
| |
| q:before, |
| q:after { |
| content: ''; |
| content: none; |
| } |
| |
| /** |
| * Address inconsistent and variable font size in all browsers. |
| */ |
| |
| small { |
| font-size: 80%; |
| } |
| |
| /** |
| * Prevent `sub` and `sup` affecting `line-height` in all browsers. |
| */ |
| |
| sub, |
| sup { |
| font-size: 75%; |
| line-height: 0; |
| position: relative; |
| vertical-align: baseline; |
| } |
| |
| sup { |
| top: -0.5em; |
| } |
| |
| sub { |
| bottom: -0.25em; |
| } |
| |
| /* ========================================================================== |
| Lists |
| ========================================================================== */ |
| |
| /** |
| * Address margins set differently in IE 6/7. |
| */ |
| |
| dl, |
| menu, |
| ol, |
| ul { |
| margin: 1em 0; |
| } |
| |
| dd { |
| margin: 0 0 0 40px; |
| } |
| |
| /** |
| * Address paddings set differently in IE 6/7. |
| */ |
| |
| menu, |
| ol, |
| ul { |
| padding: 0 0 0 40px; |
| } |
| |
| /** |
| * Correct list images handled incorrectly in IE 7. |
| */ |
| |
| nav ul, |
| nav ol { |
| list-style: none; |
| list-style-image: none; |
| } |
| |
| /* ========================================================================== |
| Embedded content |
| ========================================================================== */ |
| |
| /** |
| * 1. Remove border when inside `a` element in IE 6/7/8/9 and Firefox 3. |
| * 2. Improve image quality when scaled in IE 7. |
| */ |
| |
| img { |
| border: 0; /* 1 */ |
| -ms-interpolation-mode: bicubic; /* 2 */ |
| } |
| |
| /** |
| * Correct overflow displayed oddly in IE 9. |
| */ |
| |
| svg:not(:root) { |
| overflow: hidden; |
| } |
| |
| /* ========================================================================== |
| Figures |
| ========================================================================== */ |
| |
| /** |
| * Address margin not present in IE 6/7/8/9, Safari 5, and Opera 11. |
| */ |
| |
| figure { |
| margin: 0; |
| } |
| |
| /* ========================================================================== |
| Forms |
| ========================================================================== */ |
| |
| /** |
| * Correct margin displayed oddly in IE 6/7. |
| */ |
| |
| form { |
| margin: 0; |
| } |
| |
| /** |
| * Define consistent border, margin, and padding. |
| */ |
| |
| fieldset { |
| border: 1px solid #c0c0c0; |
| margin: 0 2px; |
| padding: 0.35em 0.625em 0.75em; |
| } |
| |
| /** |
| * 1. Correct color not being inherited in IE 6/7/8/9. |
| * 2. Correct text not wrapping in Firefox 3. |
| * 3. Correct alignment displayed oddly in IE 6/7. |
| */ |
| |
| legend { |
| border: 0; /* 1 */ |
| padding: 0; |
| white-space: normal; /* 2 */ |
| *margin-left: -7px; /* 3 */ |
| } |
| |
| /** |
| * 1. Correct font size not being inherited in all browsers. |
| * 2. Address margins set differently in IE 6/7, Firefox 3+, Safari 5, |
| * and Chrome. |
| * 3. Improve appearance and consistency in all browsers. |
| */ |
| |
| button, |
| input, |
| select, |
| textarea { |
| font-size: 100%; /* 1 */ |
| margin: 0; /* 2 */ |
| vertical-align: baseline; /* 3 */ |
| *vertical-align: middle; /* 3 */ |
| } |
| |
| /** |
| * Address Firefox 3+ setting `line-height` on `input` using `!important` in |
| * the UA stylesheet. |
| */ |
| |
| button, |
| input { |
| line-height: normal; |
| } |
| |
| /** |
| * Address inconsistent `text-transform` inheritance for `button` and `select`. |
| * All other form control elements do not inherit `text-transform` values. |
| * Correct `button` style inheritance in Chrome, Safari 5+, and IE 6+. |
| * Correct `select` style inheritance in Firefox 4+ and Opera. |
| */ |
| |
| button, |
| select { |
| text-transform: none; |
| } |
| |
| /** |
| * 1. Avoid the WebKit bug in Android 4.0.* where (2) destroys native `audio` |
| * and `video` controls. |
| * 2. Correct inability to style clickable `input` types in iOS. |
| * 3. Improve usability and consistency of cursor style between image-type |
| * `input` and others. |
| * 4. Remove inner spacing in IE 7 without affecting normal text inputs. |
| * Known issue: inner spacing remains in IE 6. |
| */ |
| |
| button, |
| html input[type="button"], /* 1 */ |
| input[type="reset"], |
| input[type="submit"] { |
| -webkit-appearance: button; /* 2 */ |
| cursor: pointer; /* 3 */ |
| *overflow: visible; /* 4 */ |
| } |
| |
| /** |
| * Re-set default cursor for disabled elements. |
| */ |
| |
| button[disabled], |
| html input[disabled] { |
| cursor: default; |
| } |
| |
| /** |
| * 1. Address box sizing set to content-box in IE 8/9. |
| * 2. Remove excess padding in IE 8/9. |
| * 3. Remove excess padding in IE 7. |
| * Known issue: excess padding remains in IE 6. |
| */ |
| |
| input[type="checkbox"], |
| input[type="radio"] { |
| box-sizing: border-box; /* 1 */ |
| padding: 0; /* 2 */ |
| *height: 13px; /* 3 */ |
| *width: 13px; /* 3 */ |
| } |
| |
| /** |
| * 1. Address `appearance` set to `searchfield` in Safari 5 and Chrome. |
| * 2. Address `box-sizing` set to `border-box` in Safari 5 and Chrome |
| * (include `-moz` to future-proof). |
| */ |
| |
| input[type="search"] { |
| -webkit-appearance: textfield; /* 1 */ |
| -moz-box-sizing: content-box; |
| -webkit-box-sizing: content-box; /* 2 */ |
| box-sizing: content-box; |
| } |
| |
| /** |
| * Remove inner padding and search cancel button in Safari 5 and Chrome |
| * on OS X. |
| */ |
| |
| input[type="search"]::-webkit-search-cancel-button, |
| input[type="search"]::-webkit-search-decoration { |
| -webkit-appearance: none; |
| } |
| |
| /** |
| * Remove inner padding and border in Firefox 3+. |
| */ |
| |
| button::-moz-focus-inner, |
| input::-moz-focus-inner { |
| border: 0; |
| padding: 0; |
| } |
| |
| /** |
| * 1. Remove default vertical scrollbar in IE 6/7/8/9. |
| * 2. Improve readability and alignment in all browsers. |
| */ |
| |
| textarea { |
| overflow: auto; /* 1 */ |
| vertical-align: top; /* 2 */ |
| } |
| |
| /* ========================================================================== |
| Tables |
| ========================================================================== */ |
| |
| /** |
| * Remove most spacing between table cells. |
| */ |
| |
| table { |
| border-collapse: collapse; |
| border-spacing: 0; |
| } |
| |
| </style> |
| |
| <style type="text/css"> |
| |
| html, body { |
| margin: 0; |
| padding: 0; |
| min-height: 100%; |
| } |
| body { |
| background: #fff; |
| font-family: "Source Sans Pro", "Helvetica Neueue", Helvetica, sans; |
| font-weight: 300; |
| font-size: 16px; |
| line-height: 1.6em; |
| } |
| #content { |
| width: 70%; |
| max-width: 850px; |
| float: left; |
| padding: 30px 60px; |
| border-left: 1px solid #ddd; |
| } |
| #sidebar { |
| width: 25%; |
| float: left; |
| padding: 30px; |
| overflow: hidden; |
| } |
| #nav { |
| font-size: 130%; |
| margin: 0 0 15px 0; |
| } |
| |
| #top { |
| display: block; |
| position: fixed; |
| bottom: 5px; |
| left: 5px; |
| font-size: .85em; |
| text-transform: uppercase; |
| } |
| |
| #footer { |
| font-size: .75em; |
| padding: 5px 30px; |
| border-top: 1px solid #ddd; |
| text-align: right; |
| } |
| #footer p { |
| margin: 0 0 0 30px; |
| display: inline-block; |
| } |
| |
| h1, h2, h3, h4, h5 { |
| font-weight: 300; |
| } |
| h1 { |
| font-size: 2.5em; |
| line-height: 1.1em; |
| margin: 0 0 .50em 0; |
| } |
| |
| h2 { |
| font-size: 1.75em; |
| margin: 1em 0 .50em 0; |
| } |
| |
| h3 { |
| margin: 25px 0 10px 0; |
| } |
| |
| h4 { |
| margin: 0; |
| font-size: 105%; |
| } |
| |
| a { |
| color: #058; |
| text-decoration: none; |
| transition: color .3s ease-in-out; |
| } |
| |
| a:hover { |
| color: #e08524; |
| transition: color .3s ease-in-out; |
| } |
| |
| pre, code, .mono, .name { |
| font-family: "Ubuntu Mono", "Cousine", "DejaVu Sans Mono", monospace; |
| } |
| |
| .title .name { |
| font-weight: bold; |
| } |
| .section-title { |
| margin-top: 2em; |
| } |
| .ident { |
| color: #900; |
| } |
| |
| code { |
| background: #f9f9f9; |
| } |
| |
| pre { |
| background: #fefefe; |
| border: 1px solid #ddd; |
| box-shadow: 2px 2px 0 #f3f3f3; |
| margin: 0 30px; |
| padding: 15px 30px; |
| } |
| |
| .codehilite { |
| margin: 0 30px 10px 30px; |
| } |
| |
| .codehilite pre { |
| margin: 0; |
| } |
| .codehilite .err { background: #ff3300; color: #fff !important; } |
| |
| table#module-list { |
| font-size: 110%; |
| } |
| |
| table#module-list tr td:first-child { |
| padding-right: 10px; |
| white-space: nowrap; |
| } |
| |
| table#module-list td { |
| vertical-align: top; |
| padding-bottom: 8px; |
| } |
| |
| table#module-list td p { |
| margin: 0 0 7px 0; |
| } |
| |
| .def { |
| display: table; |
| } |
| |
| .def p { |
| display: table-cell; |
| vertical-align: top; |
| text-align: left; |
| } |
| |
| .def p:first-child { |
| white-space: nowrap; |
| } |
| |
| .def p:last-child { |
| width: 100%; |
| } |
| |
| |
| #index { |
| list-style-type: none; |
| margin: 0; |
| padding: 0; |
| } |
| ul#index .class_name { |
| /* font-size: 110%; */ |
| font-weight: bold; |
| } |
| #index ul { |
| margin: 0; |
| } |
| |
| .item { |
| margin: 0 0 15px 0; |
| } |
| |
| .item .class { |
| margin: 0 0 25px 30px; |
| } |
| |
| .item .class ul.class_list { |
| margin: 0 0 20px 0; |
| } |
| |
| .item .name { |
| background: #fafafa; |
| margin: 0; |
| font-weight: bold; |
| padding: 5px 10px; |
| border-radius: 3px; |
| display: inline-block; |
| min-width: 40%; |
| } |
| .item .name:hover { |
| background: #f6f6f6; |
| } |
| |
| .item .empty_desc { |
| margin: 0 0 5px 0; |
| padding: 0; |
| } |
| |
| .item .inheritance { |
| margin: 3px 0 0 30px; |
| } |
| |
| .item .inherited { |
| color: #666; |
| } |
| |
| .item .desc { |
| padding: 0 8px; |
| margin: 0; |
| } |
| |
| .item .desc p { |
| margin: 0 0 10px 0; |
| } |
| |
| .source_cont { |
| margin: 0; |
| padding: 0; |
| } |
| |
| .source_link a { |
| background: #ffc300; |
| font-weight: 400; |
| font-size: .75em; |
| text-transform: uppercase; |
| color: #fff; |
| text-shadow: 1px 1px 0 #f4b700; |
| |
| padding: 3px 8px; |
| border-radius: 2px; |
| transition: background .3s ease-in-out; |
| } |
| .source_link a:hover { |
| background: #FF7200; |
| text-shadow: none; |
| transition: background .3s ease-in-out; |
| } |
| |
| .source { |
| display: none; |
| max-height: 600px; |
| overflow-y: scroll; |
| margin-bottom: 15px; |
| } |
| |
| .source .codehilite { |
| margin: 0; |
| } |
| |
| .desc h1, .desc h2, .desc h3 { |
| font-size: 100% !important; |
| } |
| .clear { |
| clear: both; |
| } |
| |
| @media all and (max-width: 950px) { |
| #sidebar { |
| width: 35%; |
| } |
| #content { |
| width: 65%; |
| } |
| } |
| @media all and (max-width: 650px) { |
| #top { |
| display: none; |
| } |
| #sidebar { |
| float: none; |
| width: auto; |
| } |
| #content { |
| float: none; |
| width: auto; |
| padding: 30px; |
| } |
| |
| #index ul { |
| padding: 0; |
| margin-bottom: 15px; |
| } |
| #index ul li { |
| display: inline-block; |
| margin-right: 30px; |
| } |
| #footer { |
| text-align: left; |
| } |
| #footer p { |
| display: block; |
| margin: inherit; |
| } |
| } |
| |
| /*****************************/ |
| |
| </style> |
| |
| |
| <style type="text/css"> |
| |
| /* ========================================================================== |
| EXAMPLE Media Queries for Responsive Design. |
| These examples override the primary ('mobile first') styles. |
| Modify as content requires. |
| ========================================================================== */ |
| |
| @media only screen and (min-width: 35em) { |
| /* Style adjustments for viewports that meet the condition */ |
| } |
| |
| @media print, |
| (-o-min-device-pixel-ratio: 5/4), |
| (-webkit-min-device-pixel-ratio: 1.25), |
| (min-resolution: 120dpi) { |
| /* Style adjustments for high resolution devices */ |
| } |
| |
| /* ========================================================================== |
| Print styles. |
| Inlined to avoid required HTTP connection: h5bp.com/r |
| ========================================================================== */ |
| |
| @media print { |
| * { |
| background: transparent !important; |
| color: #000 !important; /* Black prints faster: h5bp.com/s */ |
| box-shadow: none !important; |
| text-shadow: none !important; |
| } |
| |
| a, |
| a:visited { |
| text-decoration: underline; |
| } |
| |
| a[href]:after { |
| content: " (" attr(href) ")"; |
| } |
| |
| abbr[title]:after { |
| content: " (" attr(title) ")"; |
| } |
| |
| /* |
| * Don't show links for images, or javascript/internal links |
| */ |
| |
| .ir a:after, |
| a[href^="javascript:"]:after, |
| a[href^="#"]:after { |
| content: ""; |
| } |
| |
| pre, |
| blockquote { |
| border: 1px solid #999; |
| page-break-inside: avoid; |
| } |
| |
| thead { |
| display: table-header-group; /* h5bp.com/t */ |
| } |
| |
| tr, |
| img { |
| page-break-inside: avoid; |
| } |
| |
| img { |
| max-width: 100% !important; |
| } |
| |
| @page { |
| margin: 0.5cm; |
| } |
| |
| p, |
| h2, |
| h3 { |
| orphans: 3; |
| widows: 3; |
| } |
| |
| h2, |
| h3 { |
| page-break-after: avoid; |
| } |
| } |
| |
| </style> |
| |
| <script type="text/javascript"> |
| function toggle(id, $link) { |
| $node = document.getElementById(id); |
| if (!$node) |
| return; |
| if (!$node.style.display || $node.style.display == 'none') { |
| $node.style.display = 'block'; |
| $link.innerHTML = 'Hide source ≢'; |
| } else { |
| $node.style.display = 'none'; |
| $link.innerHTML = 'Show source ≡'; |
| } |
| } |
| </script> |
| </head> |
| <body> |
| <a href="#" id="top">Top</a> |
| |
| <div id="container"> |
| |
| |
| <div id="sidebar"> |
| <h1>Index</h1> |
| <ul id="index"> |
| |
| |
| <li class="set"><h3><a href="#header-classes">Classes</a></h3> |
| <ul> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Authentication">Authentication</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Authentication.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.AuthenticationAthenz.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.AuthenticationOauth2.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.AuthenticationTLS.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.AuthenticationToken.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Client">Client</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Client.__init__">__init__</a></li> |
| <li class="mono"><a href="#pulsar.Client.close">close</a></li> |
| <li class="mono"><a href="#pulsar.Client.create_producer">create_producer</a></li> |
| <li class="mono"><a href="#pulsar.Client.create_reader">create_reader</a></li> |
| <li class="mono"><a href="#pulsar.Client.get_topic_partitions">get_topic_partitions</a></li> |
| <li class="mono"><a href="#pulsar.Client.shutdown">shutdown</a></li> |
| <li class="mono"><a href="#pulsar.Client.subscribe">subscribe</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Consumer">Consumer</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Consumer.acknowledge">acknowledge</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.acknowledge_cumulative">acknowledge_cumulative</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.close">close</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.is_connected">is_connected</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.negative_acknowledge">negative_acknowledge</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.pause_message_listener">pause_message_listener</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.receive">receive</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.redeliver_unacknowledged_messages">redeliver_unacknowledged_messages</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.resume_message_listener">resume_message_listener</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.seek">seek</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.subscription_name">subscription_name</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.topic">topic</a></li> |
| <li class="mono"><a href="#pulsar.Consumer.unsubscribe">unsubscribe</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.CryptoKeyReader.__init__">__init__</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Message">Message</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Message.data">data</a></li> |
| <li class="mono"><a href="#pulsar.Message.event_timestamp">event_timestamp</a></li> |
| <li class="mono"><a href="#pulsar.Message.message_id">message_id</a></li> |
| <li class="mono"><a href="#pulsar.Message.partition_key">partition_key</a></li> |
| <li class="mono"><a href="#pulsar.Message.properties">properties</a></li> |
| <li class="mono"><a href="#pulsar.Message.publish_timestamp">publish_timestamp</a></li> |
| <li class="mono"><a href="#pulsar.Message.redelivery_count">redelivery_count</a></li> |
| <li class="mono"><a href="#pulsar.Message.schema_version">schema_version</a></li> |
| <li class="mono"><a href="#pulsar.Message.topic_name">topic_name</a></li> |
| <li class="mono"><a href="#pulsar.Message.value">value</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.MessageBatch">MessageBatch</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.MessageBatch.__init__">__init__</a></li> |
| <li class="mono"><a href="#pulsar.MessageBatch.parse_from">parse_from</a></li> |
| <li class="mono"><a href="#pulsar.MessageBatch.with_message_id">with_message_id</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.MessageId">MessageId</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.MessageId.deserialize">deserialize</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.__init__">__init__</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.batch_index">batch_index</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.entry_id">entry_id</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.ledger_id">ledger_id</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.partition">partition</a></li> |
| <li class="mono"><a href="#pulsar.MessageId.serialize">serialize</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Producer">Producer</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Producer.close">close</a></li> |
| <li class="mono"><a href="#pulsar.Producer.flush">flush</a></li> |
| <li class="mono"><a href="#pulsar.Producer.is_connected">is_connected</a></li> |
| <li class="mono"><a href="#pulsar.Producer.last_sequence_id">last_sequence_id</a></li> |
| <li class="mono"><a href="#pulsar.Producer.producer_name">producer_name</a></li> |
| <li class="mono"><a href="#pulsar.Producer.send">send</a></li> |
| <li class="mono"><a href="#pulsar.Producer.send_async">send_async</a></li> |
| <li class="mono"><a href="#pulsar.Producer.topic">topic</a></li> |
| </ul> |
| |
| </li> |
| <li class="mono"> |
| <span class="class_name"><a href="#pulsar.Reader">Reader</a></span> |
| |
| |
| <ul> |
| <li class="mono"><a href="#pulsar.Reader.close">close</a></li> |
| <li class="mono"><a href="#pulsar.Reader.has_message_available">has_message_available</a></li> |
| <li class="mono"><a href="#pulsar.Reader.is_connected">is_connected</a></li> |
| <li class="mono"><a href="#pulsar.Reader.read_next">read_next</a></li> |
| <li class="mono"><a href="#pulsar.Reader.seek">seek</a></li> |
| <li class="mono"><a href="#pulsar.Reader.topic">topic</a></li> |
| </ul> |
| |
| </li> |
| </ul> |
| </li> |
| |
| <li class="set"><h3><a href="#header-submodules">Sub-modules</a></h3> |
| <ul> |
| <li class="mono"><a href="exceptions.m.html">pulsar.exceptions</a></li> |
| <li class="mono"><a href="functions/index.html">pulsar.functions</a></li> |
| <li class="mono"><a href="schema/index.html">pulsar.schema</a></li> |
| </ul> |
| </li> |
| </ul> |
| </div> |
| |
| <article id="content"> |
| |
| |
| |
| |
| |
| |
| <header id="section-intro"> |
| <h1 class="title"><span class="name">pulsar</span> module</h1> |
| <p>The Pulsar Python client library is based on the existing C++ client library. |
| All the same features are exposed through the Python interface.</p> |
| <p>Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.</p> |
| <h2>Install from PyPI</h2> |
| <p>Download Python wheel binary files for MacOS and Linux |
| directly from the PyPI archive.</p> |
| <pre><code>#!shell |
| $ sudo pip install pulsar-client |
| </code></pre> |
| <h2>Install from sources</h2> |
| <p>Follow the instructions to compile the Pulsar C++ client library. This method |
| will also build the Python binding for the library.</p> |
| <p>To install the Python bindings:</p> |
| <pre><code>#!shell |
| $ cd pulsar-client-cpp/python |
| $ sudo python setup.py install |
| </code></pre> |
| <h2>Examples</h2> |
| <h3><a href="#pulsar.Producer">Producer</a> example</h3> |
| <pre><code>#!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| |
| producer = client.create_producer('my-topic') |
| |
| for i in range(10): |
| producer.send(('Hello-%d' % i).encode('utf-8')) |
| |
| client.close() |
| </code></pre> |
| <h4><a href="#pulsar.Consumer">Consumer</a> Example</h4> |
| <pre><code>#!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| consumer = client.subscribe('my-topic', 'my-subscription') |
| |
| while True: |
| msg = consumer.receive() |
| try: |
| print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id()) |
| consumer.acknowledge(msg) |
| except: |
| consumer.negative_acknowledge(msg) |
| |
| client.close() |
| </code></pre> |
| <h3><a href="#pulsar.Producer.send_async">Async producer</a> example</h3> |
| <pre><code>#!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| |
| producer = client.create_producer( |
| 'my-topic', |
| block_if_queue_full=True, |
| batching_enabled=True, |
| batching_max_publish_delay_ms=10 |
| ) |
| |
| def send_callback(res, msg_id): |
| print('Message published res=%s', res) |
| |
| while True: |
| producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback) |
| |
| client.close() |
| </code></pre> |
| |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar', this);">Show source ≡</a></p> |
| <div id="source-pulsar" class="source"> |
| <pre><code># |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| """ |
| The Pulsar Python client library is based on the existing C++ client library. |
| All the same features are exposed through the Python interface. |
| |
| Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8. |
| |
| ## Install from PyPI |
| |
| Download Python wheel binary files for MacOS and Linux |
| directly from the PyPI archive. |
| |
| #!shell |
| $ sudo pip install pulsar-client |
| |
| ## Install from sources |
| |
| Follow the instructions to compile the Pulsar C++ client library. This method |
| will also build the Python binding for the library. |
| |
| To install the Python bindings: |
| |
| #!shell |
| $ cd pulsar-client-cpp/python |
| $ sudo python setup.py install |
| |
| ## Examples |
| |
| ### [Producer](#pulsar.Producer) example |
| |
| #!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| |
| producer = client.create_producer('my-topic') |
| |
| for i in range(10): |
| producer.send(('Hello-%d' % i).encode('utf-8')) |
| |
| client.close() |
| |
| #### [Consumer](#pulsar.Consumer) Example |
| |
| #!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| consumer = client.subscribe('my-topic', 'my-subscription') |
| |
| while True: |
| msg = consumer.receive() |
| try: |
| print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id()) |
| consumer.acknowledge(msg) |
| except: |
| consumer.negative_acknowledge(msg) |
| |
| client.close() |
| |
| ### [Async producer](#pulsar.Producer.send_async) example |
| |
| #!python |
| import pulsar |
| |
| client = pulsar.Client('pulsar://localhost:6650') |
| |
| producer = client.create_producer( |
| 'my-topic', |
| block_if_queue_full=True, |
| batching_enabled=True, |
| batching_max_publish_delay_ms=10 |
| ) |
| |
| def send_callback(res, msg_id): |
| print('Message published res=%s', res) |
| |
| while True: |
| producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback) |
| |
| client.close() |
| """ |
| |
| import logging |
| import _pulsar |
| |
| from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401 |
| |
| from pulsar.exceptions import * |
| |
| from pulsar.functions.function import Function |
| from pulsar.functions.context import Context |
| from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe |
| from pulsar import schema |
| _schema = schema |
| |
| import re |
| _retype = type(re.compile('x')) |
| |
| import certifi |
| from datetime import timedelta |
| |
| |
| class MessageId: |
| """ |
| Represents a message id |
| """ |
| |
| def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): |
| self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) |
| |
| 'Represents the earliest message stored in a topic' |
| earliest = _pulsar.MessageId.earliest |
| |
| 'Represents the latest message published on a topic' |
| latest = _pulsar.MessageId.latest |
| |
| def ledger_id(self): |
| return self._msg_id.ledger_id() |
| |
| def entry_id(self): |
| return self._msg_id.entry_id() |
| |
| def batch_index(self): |
| return self._msg_id.batch_index() |
| |
| def partition(self): |
| return self._msg_id.partition() |
| |
| def serialize(self): |
| """ |
| Returns a bytes representation of the message id. |
| This bytes sequence can be stored and later deserialized. |
| """ |
| return self._msg_id.serialize() |
| |
| @staticmethod |
| def deserialize(message_id_bytes): |
| """ |
| Deserialize a message id object from a previously |
| serialized bytes sequence. |
| """ |
| return _pulsar.MessageId.deserialize(message_id_bytes) |
| |
| |
| class Message: |
| """ |
| Message objects are returned by a consumer, either by calling `receive` or |
| through a listener. |
| """ |
| |
| def data(self): |
| """ |
| Returns object typed bytes with the payload of the message. |
| """ |
| return self._message.data() |
| |
| def value(self): |
| """ |
| Returns object with the de-serialized version of the message content |
| """ |
| return self._schema.decode(self._message.data()) |
| |
| def properties(self): |
| """ |
| Return the properties attached to the message. Properties are |
| application-defined key/value pairs that will be attached to the |
| message. |
| """ |
| return self._message.properties() |
| |
| def partition_key(self): |
| """ |
| Get the partitioning key for the message. |
| """ |
| return self._message.partition_key() |
| |
| def publish_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message publish time. |
| """ |
| return self._message.publish_timestamp() |
| |
| def event_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message event time. |
| """ |
| return self._message.event_timestamp() |
| |
| def message_id(self): |
| """ |
| The message ID that can be used to refere to this particular message. |
| """ |
| return self._message.message_id() |
| |
| def topic_name(self): |
| """ |
| Get the topic Name from which this message originated from |
| """ |
| return self._message.topic_name() |
| |
| def redelivery_count(self): |
| """ |
| Get the redelivery count for this message |
| """ |
| return self._message.redelivery_count() |
| |
| def schema_version(self): |
| """ |
| Get the schema version for this message |
| """ |
| return self._message.schema_version() |
| |
| @staticmethod |
| def _wrap(_message): |
| self = Message() |
| self._message = _message |
| return self |
| |
| |
| class MessageBatch: |
| |
| def __init__(self): |
| self._msg_batch = _pulsar.MessageBatch() |
| |
| def with_message_id(self, msg_id): |
| if not isinstance(msg_id, _pulsar.MessageId): |
| if isinstance(msg_id, MessageId): |
| msg_id = msg_id._msg_id |
| else: |
| raise TypeError("unknown message id type") |
| self._msg_batch.with_message_id(msg_id) |
| return self |
| |
| def parse_from(self, data, size): |
| self._msg_batch.parse_from(data, size) |
| _msgs = self._msg_batch.messages() |
| return list(map(Message._wrap, _msgs)) |
| |
| |
| class Authentication: |
| """ |
| Authentication provider object. Used to load authentication from an external |
| shared library. |
| """ |
| def __init__(self, dynamicLibPath, authParamsString): |
| """ |
| Create the authentication provider instance. |
| |
| **Args** |
| |
| * `dynamicLibPath`: Path to the authentication provider shared library |
| (such as `tls.so`) |
| * `authParamsString`: Comma-separated list of provider-specific |
| configuration params |
| """ |
| _check_type(str, dynamicLibPath, 'dynamicLibPath') |
| _check_type(str, authParamsString, 'authParamsString') |
| self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) |
| |
| |
| class AuthenticationTLS(Authentication): |
| """ |
| TLS Authentication implementation |
| """ |
| def __init__(self, certificate_path, private_key_path): |
| """ |
| Create the TLS authentication provider instance. |
| |
| **Args** |
| |
| * `certificatePath`: Path to the public certificate |
| * `privateKeyPath`: Path to private TLS key |
| """ |
| _check_type(str, certificate_path, 'certificate_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) |
| |
| |
| class AuthenticationToken(Authentication): |
| """ |
| Token based authentication implementation |
| """ |
| def __init__(self, token): |
| """ |
| Create the token authentication provider instance. |
| |
| **Args** |
| |
| * `token`: A string containing the token or a functions that provides a |
| string with the token |
| """ |
| if not (isinstance(token, str) or callable(token)): |
| raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") |
| self.auth = _pulsar.AuthenticationToken(token) |
| |
| |
| class AuthenticationAthenz(Authentication): |
| """ |
| Athenz Authentication implementation |
| """ |
| def __init__(self, auth_params_string): |
| """ |
| Create the Athenz authentication provider instance. |
| |
| **Args** |
| |
| * `auth_params_string`: JSON encoded configuration for Athenz client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationAthenz(auth_params_string) |
| |
| class AuthenticationOauth2(Authentication): |
| """ |
| Oauth2 Authentication implementation |
| """ |
| def __init__(self, auth_params_string): |
| """ |
| Create the Oauth2 authentication provider instance. |
| |
| **Args** |
| |
| * `auth_params_string`: JSON encoded configuration for Oauth2 client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationOauth2(auth_params_string) |
| |
| class Client: |
| """ |
| The Pulsar client. A single client instance can be used to create producers |
| and consumers on multiple topics. |
| |
| The client will share the same connection pool and threads across all |
| producers and consumers. |
| """ |
| |
| def __init__(self, service_url, |
| authentication=None, |
| operation_timeout_seconds=30, |
| io_threads=1, |
| message_listener_threads=1, |
| concurrent_lookup_requests=50000, |
| log_conf_file_path=None, |
| use_tls=False, |
| tls_trust_certs_file_path=None, |
| tls_allow_insecure_connection=False, |
| tls_validate_hostname=False, |
| logger=None, |
| connection_timeout_ms=10000, |
| ): |
| """ |
| Create a new Pulsar client instance. |
| |
| **Args** |
| |
| * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ |
| |
| **Options** |
| |
| * `authentication`: |
| Set the authentication provider to be used with the broker. For example: |
| `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` |
| * `operation_timeout_seconds`: |
| Set timeout on client operations (subscribe, create producer, close, |
| unsubscribe). |
| * `io_threads`: |
| Set the number of IO threads to be used by the Pulsar client. |
| * `message_listener_threads`: |
| Set the number of threads to be used by the Pulsar client when |
| delivering messages through message listener. The default is 1 thread |
| per Pulsar client. If using more than 1 thread, messages for distinct |
| `message_listener`s will be delivered in different threads, however a |
| single `MessageListener` will always be assigned to the same thread. |
| * `concurrent_lookup_requests`: |
| Number of concurrent lookup-requests allowed on each broker connection |
| to prevent overload on the broker. |
| * `log_conf_file_path`: |
| Initialize log4cxx from a configuration file. |
| * `use_tls`: |
| Configure whether to use TLS encryption on the connection. This setting |
| is deprecated. TLS will be automatically enabled if the `serviceUrl` is |
| set to `pulsar+ssl://` or `https://` |
| * `tls_trust_certs_file_path`: |
| Set the path to the trusted TLS certificate file. If empty defaults to |
| certifi. |
| * `tls_allow_insecure_connection`: |
| Configure whether the Pulsar client accepts untrusted TLS certificates |
| from the broker. |
| * `tls_validate_hostname`: |
| Configure whether the Pulsar client validates that the hostname of the |
| endpoint, matches the common name on the TLS certificate presented by |
| the endpoint. |
| * `logger`: |
| Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. |
| * `connection_timeout_ms`: |
| Set timeout in milliseconds on TCP connections. |
| """ |
| _check_type(str, service_url, 'service_url') |
| _check_type_or_none(Authentication, authentication, 'authentication') |
| _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') |
| _check_type(int, connection_timeout_ms, 'connection_timeout_ms') |
| _check_type(int, io_threads, 'io_threads') |
| _check_type(int, message_listener_threads, 'message_listener_threads') |
| _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') |
| _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') |
| _check_type(bool, use_tls, 'use_tls') |
| _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') |
| _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') |
| _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') |
| _check_type_or_none(logging.Logger, logger, 'logger') |
| |
| conf = _pulsar.ClientConfiguration() |
| if authentication: |
| conf.authentication(authentication.auth) |
| conf.operation_timeout_seconds(operation_timeout_seconds) |
| conf.connection_timeout(connection_timeout_ms) |
| conf.io_threads(io_threads) |
| conf.message_listener_threads(message_listener_threads) |
| conf.concurrent_lookup_requests(concurrent_lookup_requests) |
| if log_conf_file_path: |
| conf.log_conf_file_path(log_conf_file_path) |
| if logger: |
| conf.set_logger(logger) |
| if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): |
| conf.use_tls(True) |
| if tls_trust_certs_file_path: |
| conf.tls_trust_certs_file_path(tls_trust_certs_file_path) |
| else: |
| conf.tls_trust_certs_file_path(certifi.where()) |
| conf.tls_allow_insecure_connection(tls_allow_insecure_connection) |
| conf.tls_validate_hostname(tls_validate_hostname) |
| self._client = _pulsar.Client(service_url, conf) |
| self._consumers = [] |
| |
| def create_producer(self, topic, |
| producer_name=None, |
| schema=schema.BytesSchema(), |
| initial_sequence_id=None, |
| send_timeout_millis=30000, |
| compression_type=CompressionType.NONE, |
| max_pending_messages=1000, |
| max_pending_messages_across_partitions=50000, |
| block_if_queue_full=False, |
| batching_enabled=False, |
| batching_max_messages=1000, |
| batching_max_allowed_size_in_bytes=128*1024, |
| batching_max_publish_delay_ms=10, |
| message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, |
| lazy_start_partitioned_producers=False, |
| properties=None, |
| batching_type=BatchingType.Default, |
| encryption_key=None, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a new producer on a given topic. |
| |
| **Args** |
| |
| * `topic`: |
| The topic name |
| |
| **Options** |
| |
| * `producer_name`: |
| Specify a name for the producer. If not assigned, |
| the system will generate a globally unique name which can be accessed |
| with `Producer.producer_name()`. When specifying a name, it is app to |
| the user to ensure that, for a given topic, the producer name is unique |
| across all Pulsar's clusters. |
| * `schema`: |
| Define the schema of the data that will be published by this producer. |
| The schema will be used for two purposes: |
| - Validate the data format against the topic defined schema |
| - Perform serialization/deserialization between data and objects |
| An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. |
| * `initial_sequence_id`: |
| Set the baseline for the sequence ids for messages |
| published by the producer. First message will be using |
| `(initialSequenceId + 1)`` as its sequence id and subsequent messages will |
| be assigned incremental sequence ids, if not otherwise specified. |
| * `send_timeout_millis`: |
| If a message is not acknowledged by the server before the |
| `send_timeout` expires, an error will be reported. |
| * `compression_type`: |
| Set the compression type for the producer. By default, message |
| payloads are not compressed. Supported compression types are |
| `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. |
| ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with ZSTD. |
| SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with SNAPPY. |
| * `max_pending_messages`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment from the broker. |
| * `max_pending_messages_across_partitions`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment across partitions from the broker. |
| * `block_if_queue_full`: Set whether `send_async` operations should |
| block when the outgoing message queue is full. |
| * `message_routing_mode`: |
| Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, |
| other option is `PartitionsRoutingMode.UseSinglePartition` |
| * `lazy_start_partitioned_producers`: |
| This config affects producers of partitioned topics only. It controls whether |
| producers register and connect immediately to the owner broker of each partition |
| or start lazily on demand. The internal producer of one partition is always |
| started eagerly, chosen by the routing policy, but the internal producers of |
| any additional partitions are started on demand, upon receiving their first |
| message. |
| Using this mode can reduce the strain on brokers for topics with large numbers of |
| partitions and when the SinglePartition routing policy is used without keyed messages. |
| Because producer connection can be on demand, this can produce extra send latency |
| for the first messages of a given partition. |
| * `properties`: |
| Sets the properties for the producer. The properties associated with a producer |
| can be used for identify a producer at broker side. |
| * `batching_type`: |
| Sets the batching type for the producer. |
| There are two batching type: DefaultBatching and KeyBasedBatching. |
| - Default batching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] |
| |
| - KeyBasedBatching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] |
| * encryption_key: |
| The key used for symmetric encryption, configured on the producer side |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type_or_none(str, producer_name, 'producer_name') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') |
| _check_type(int, send_timeout_millis, 'send_timeout_millis') |
| _check_type(CompressionType, compression_type, 'compression_type') |
| _check_type(int, max_pending_messages, 'max_pending_messages') |
| _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') |
| _check_type(bool, block_if_queue_full, 'block_if_queue_full') |
| _check_type(bool, batching_enabled, 'batching_enabled') |
| _check_type(int, batching_max_messages, 'batching_max_messages') |
| _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') |
| _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(BatchingType, batching_type, 'batching_type') |
| _check_type_or_none(str, encryption_key, 'encryption_key') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') |
| |
| conf = _pulsar.ProducerConfiguration() |
| conf.send_timeout_millis(send_timeout_millis) |
| conf.compression_type(compression_type) |
| conf.max_pending_messages(max_pending_messages) |
| conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) |
| conf.block_if_queue_full(block_if_queue_full) |
| conf.batching_enabled(batching_enabled) |
| conf.batching_max_messages(batching_max_messages) |
| conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) |
| conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) |
| conf.partitions_routing_mode(message_routing_mode) |
| conf.batching_type(batching_type) |
| conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) |
| if producer_name: |
| conf.producer_name(producer_name) |
| if initial_sequence_id: |
| conf.initial_sequence_id(initial_sequence_id) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| |
| conf.schema(schema.schema_info()) |
| if encryption_key: |
| conf.encryption_key(encryption_key) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| p = Producer() |
| p._producer = self._client.create_producer(topic, conf) |
| p._schema = schema |
| p._client = self._client |
| return p |
| |
| def subscribe(self, topic, subscription_name, |
| consumer_type=ConsumerType.Exclusive, |
| schema=schema.BytesSchema(), |
| message_listener=None, |
| receiver_queue_size=1000, |
| max_total_receiver_queue_size_across_partitions=50000, |
| consumer_name=None, |
| unacked_messages_timeout_ms=None, |
| broker_consumer_stats_cache_time_ms=30000, |
| negative_ack_redelivery_delay_ms=60000, |
| is_read_compacted=False, |
| properties=None, |
| pattern_auto_discovery_period=60, |
| initial_position=InitialPosition.Latest, |
| crypto_key_reader=None, |
| replicate_subscription_state_enabled=False |
| ): |
| """ |
| Subscribe to the given topic and subscription combination. |
| |
| **Args** |
| |
| * `topic`: The name of the topic, list of topics or regex pattern. |
| This method will accept these forms: |
| - `topic='my-topic'` |
| - `topic=['topic-1', 'topic-2', 'topic-3']` |
| - `topic=re.compile('persistent://public/default/topic-*')` |
| * `subscription`: The name of the subscription. |
| |
| **Options** |
| |
| * `consumer_type`: |
| Select the subscription type to be used when subscribing to the topic. |
| * `schema`: |
| Define the schema of the data that will be received by this consumer. |
| * `message_listener`: |
| Sets a message listener for the consumer. When the listener is set, |
| the application will receive messages through it. Calls to |
| `consumer.receive()` will not be allowed. The listener function needs |
| to accept (consumer, message), for example: |
| |
| #!python |
| def my_listener(consumer, message): |
| # process message |
| consumer.acknowledge(message) |
| |
| * `receiver_queue_size`: |
| Sets the size of the consumer receive queue. The consumer receive |
| queue controls how many messages can be accumulated by the consumer |
| before the application calls `receive()`. Using a higher value could |
| potentially increase the consumer throughput at the expense of higher |
| memory utilization. Setting the consumer queue size to zero decreases |
| the throughput of the consumer by disabling pre-fetching of messages. |
| This approach improves the message distribution on shared subscription |
| by pushing messages only to those consumers that are ready to process |
| them. Neither receive with timeout nor partitioned topics can be used |
| if the consumer queue size is zero. The `receive()` function call |
| should not be interrupted when the consumer queue size is zero. The |
| default value is 1000 messages and should work well for most use |
| cases. |
| * `max_total_receiver_queue_size_across_partitions` |
| Set the max total receiver queue size across partitions. |
| This setting will be used to reduce the receiver queue size for individual partitions |
| * `consumer_name`: |
| Sets the consumer name. |
| * `unacked_messages_timeout_ms`: |
| Sets the timeout in milliseconds for unacknowledged messages. The |
| timeout needs to be greater than 10 seconds. An exception is thrown if |
| the given value is less than 10 seconds. If a successful |
| acknowledgement is not sent within the timeout, all the unacknowledged |
| messages are redelivered. |
| * `negative_ack_redelivery_delay_ms`: |
| The delay after which to redeliver the messages that failed to be |
| processed (with the `consumer.negative_acknowledge()`) |
| * `broker_consumer_stats_cache_time_ms`: |
| Sets the time duration for which the broker-side consumer stats will |
| be cached in the client. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * `properties`: |
| Sets the properties for the consumer. The properties associated with a consumer |
| can be used for identify a consumer at broker side. |
| * `pattern_auto_discovery_period`: |
| Periods of seconds for consumer to auto discover match topics. |
| * `initial_position`: |
| Set the initial position of a consumer when subscribing to the topic. |
| It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. |
| Default: `Latest`. |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| * replicate_subscription_state_enabled: |
| Set whether the subscription status should be replicated. |
| Default: `False`. |
| """ |
| _check_type(str, subscription_name, 'subscription_name') |
| _check_type(ConsumerType, consumer_type, 'consumer_type') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type(int, max_total_receiver_queue_size_across_partitions, |
| 'max_total_receiver_queue_size_across_partitions') |
| _check_type_or_none(str, consumer_name, 'consumer_name') |
| _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') |
| _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') |
| _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') |
| _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(InitialPosition, initial_position, 'initial_position') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| |
| conf = _pulsar.ConsumerConfiguration() |
| conf.consumer_type(consumer_type) |
| conf.read_compacted(is_read_compacted) |
| if message_listener: |
| conf.message_listener(_listener_wrapper(message_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) |
| if consumer_name: |
| conf.consumer_name(consumer_name) |
| if unacked_messages_timeout_ms: |
| conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) |
| |
| conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) |
| conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| conf.subscription_initial_position(initial_position) |
| |
| conf.schema(schema.schema_info()) |
| |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) |
| |
| c = Consumer() |
| if isinstance(topic, str): |
| # Single topic |
| c._consumer = self._client.subscribe(topic, subscription_name, conf) |
| elif isinstance(topic, list): |
| # List of topics |
| c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) |
| elif isinstance(topic, _retype): |
| # Regex pattern |
| c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) |
| else: |
| raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") |
| |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| |
| def create_reader(self, topic, start_message_id, |
| schema=schema.BytesSchema(), |
| reader_listener=None, |
| receiver_queue_size=1000, |
| reader_name=None, |
| subscription_role_prefix=None, |
| is_read_compacted=False, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a reader on a particular topic |
| |
| **Args** |
| |
| * `topic`: The name of the topic. |
| * `start_message_id`: The initial reader positioning is done by specifying a message id. |
| The options are: |
| * `MessageId.earliest`: Start reading from the earliest message available in the topic |
| * `MessageId.latest`: Start reading from the end topic, only getting messages published |
| after the reader was created |
| * `MessageId`: When passing a particular message id, the reader will position itself on |
| that specific position. The first message to be read will be the message next to the |
| specified messageId. Message id can be serialized into a string and deserialized |
| back into a `MessageId` object: |
| |
| # Serialize to string |
| s = msg.message_id().serialize() |
| |
| # Deserialize from string |
| msg_id = MessageId.deserialize(s) |
| |
| **Options** |
| |
| * `schema`: |
| Define the schema of the data that will be received by this reader. |
| * `reader_listener`: |
| Sets a message listener for the reader. When the listener is set, |
| the application will receive messages through it. Calls to |
| `reader.read_next()` will not be allowed. The listener function needs |
| to accept (reader, message), for example: |
| |
| def my_listener(reader, message): |
| # process message |
| pass |
| |
| * `receiver_queue_size`: |
| Sets the size of the reader receive queue. The reader receive |
| queue controls how many messages can be accumulated by the reader |
| before the application calls `read_next()`. Using a higher value could |
| potentially increase the reader throughput at the expense of higher |
| memory utilization. |
| * `reader_name`: |
| Sets the reader name. |
| * `subscription_role_prefix`: |
| Sets the subscription role prefix. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type_or_none(str, reader_name, 'reader_name') |
| _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| |
| conf = _pulsar.ReaderConfiguration() |
| if reader_listener: |
| conf.reader_listener(_listener_wrapper(reader_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| if reader_name: |
| conf.reader_name(reader_name) |
| if subscription_role_prefix: |
| conf.subscription_role_prefix(subscription_role_prefix) |
| conf.schema(schema.schema_info()) |
| conf.read_compacted(is_read_compacted) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| c = Reader() |
| c._reader = self._client.create_reader(topic, start_message_id, conf) |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| |
| def get_topic_partitions(self, topic): |
| """ |
| Get the list of partitions for a given topic. |
| |
| If the topic is partitioned, this will return a list of partition names. If the topic is not |
| partitioned, the returned list will contain the topic name itself. |
| |
| This can be used to discover the partitions and create Reader, Consumer or Producer |
| instances directly on a particular partition. |
| :param topic: the topic name to lookup |
| :return: a list of partition name |
| """ |
| _check_type(str, topic, 'topic') |
| return self._client.get_topic_partitions(topic) |
| |
| def shutdown(self): |
| """ |
| Perform immediate shutdown of Pulsar client. |
| |
| Release all resources and close all producer, consumer, and readers without waiting |
| for ongoing operations to complete. |
| """ |
| self._client.shutdown() |
| |
| def close(self): |
| """ |
| Close the client and all the associated producers and consumers |
| """ |
| self._client.close() |
| |
| |
| class Producer: |
| """ |
| The Pulsar message producer, used to publish messages on a topic. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic which producer is publishing to |
| """ |
| return self._producer.topic() |
| |
| def producer_name(self): |
| """ |
| Return the producer name which could have been assigned by the |
| system or specified by the client |
| """ |
| return self._producer.producer_name() |
| |
| def last_sequence_id(self): |
| """ |
| Get the last sequence id that was published by this producer. |
| |
| This represent either the automatically assigned or custom sequence id |
| (set on the `MessageBuilder`) that was published and acknowledged by the broker. |
| |
| After recreating a producer with the same producer name, this will return the |
| last message that was published in the previous producer session, or -1 if |
| there no message was ever published. |
| """ |
| return self._producer.last_sequence_id() |
| |
| def send(self, content, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Publish a message on the topic. Blocks until the message is acknowledged |
| |
| Returns a `MessageId` object that represents where the message is persisted. |
| |
| **Args** |
| |
| * `content`: |
| A `bytes` object with the message payload. |
| |
| **Options** |
| |
| * `properties`: |
| A dict of application-defined string properties. |
| * `partition_key`: |
| Sets the partition key for message routing. A hash of this key is used |
| to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: |
| Override namespace replication clusters. Note that it is the caller's |
| responsibility to provide valid cluster names and that all clusters |
| have been previously configured as topics. Given an empty list, |
| the message will replicate according to the namespace configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| return MessageId.deserialize(self._producer.send(msg)) |
| |
| def send_async(self, content, callback, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Send a message asynchronously. |
| |
| The `callback` will be invoked once the message has been acknowledged |
| by the broker. |
| |
| Example: |
| |
| #!python |
| def callback(res, msg_id): |
| print('Message published: %s' % res) |
| |
| producer.send_async(msg, callback) |
| |
| When the producer queue is full, by default the message will be rejected |
| and the callback invoked with an error code. |
| |
| **Args** |
| |
| * `content`: |
| A `bytes` object with the message payload. |
| |
| **Options** |
| |
| * `properties`: |
| A dict of application0-defined string properties. |
| * `partition_key`: |
| Sets the partition key for the message routing. A hash of this key is |
| used to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: Override namespace replication clusters. Note |
| that it is the caller's responsibility to provide valid cluster names |
| and that all clusters have been previously configured as topics. |
| Given an empty list, the message will replicate per the namespace |
| configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| self._producer.send_async(msg, callback) |
| |
| |
| def flush(self): |
| """ |
| Flush all the messages buffered in the client and wait until all messages have been |
| successfully persisted |
| """ |
| self._producer.flush() |
| |
| |
| def close(self): |
| """ |
| Close the producer. |
| """ |
| self._producer.close() |
| |
| def _build_msg(self, content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after): |
| data = self._schema.encode(content) |
| |
| _check_type(bytes, data, 'data') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type_or_none(str, partition_key, 'partition_key') |
| _check_type_or_none(int, sequence_id, 'sequence_id') |
| _check_type_or_none(list, replication_clusters, 'replication_clusters') |
| _check_type(bool, disable_replication, 'disable_replication') |
| _check_type_or_none(int, event_timestamp, 'event_timestamp') |
| _check_type_or_none(int, deliver_at, 'deliver_at') |
| _check_type_or_none(timedelta, deliver_after, 'deliver_after') |
| |
| mb = _pulsar.MessageBuilder() |
| mb.content(data) |
| if properties: |
| for k, v in properties.items(): |
| mb.property(k, v) |
| if partition_key: |
| mb.partition_key(partition_key) |
| if sequence_id: |
| mb.sequence_id(sequence_id) |
| if replication_clusters: |
| mb.replication_clusters(replication_clusters) |
| if disable_replication: |
| mb.disable_replication(disable_replication) |
| if event_timestamp: |
| mb.event_timestamp(event_timestamp) |
| if deliver_at: |
| mb.deliver_at(deliver_at) |
| if deliver_after: |
| mb.deliver_after(deliver_after) |
| |
| return mb.build() |
| |
| def is_connected(self): |
| """ |
| Check if the producer is connected or not. |
| """ |
| return self._producer.is_connected() |
| |
| |
| class Consumer: |
| """ |
| Pulsar consumer. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic this consumer is subscribed to. |
| """ |
| return self._consumer.topic() |
| |
| def subscription_name(self): |
| """ |
| Return the subscription name. |
| """ |
| return self._consumer.subscription_name() |
| |
| def unsubscribe(self): |
| """ |
| Unsubscribe the current consumer from the topic. |
| |
| This method will block until the operation is completed. Once the |
| consumer is unsubscribed, no more messages will be received and |
| subsequent new messages will not be retained for this consumer. |
| |
| This consumer object cannot be reused. |
| """ |
| return self._consumer.unsubscribe() |
| |
| def receive(self, timeout_millis=None): |
| """ |
| Receive a single message. |
| |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| |
| **Options** |
| |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._consumer.receive() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._consumer.receive(timeout_millis) |
| |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| |
| def acknowledge(self, message): |
| """ |
| Acknowledge the reception of a single message. |
| |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the message will not be re-delivered to this consumer. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge(message._message) |
| else: |
| self._consumer.acknowledge(message) |
| |
| def acknowledge_cumulative(self, message): |
| """ |
| Acknowledge the reception of all the messages in the stream up to (and |
| including) the provided message. |
| |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the messages will not be re-delivered to this consumer. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge_cumulative(message._message) |
| else: |
| self._consumer.acknowledge_cumulative(message) |
| |
| def negative_acknowledge(self, message): |
| """ |
| Acknowledge the failure to process a single message. |
| |
| When a message is "negatively acked" it will be marked for redelivery after |
| some fixed delay. The delay is configurable when constructing the consumer |
| with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. |
| |
| This call is not blocking. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.negative_acknowledge(message._message) |
| else: |
| self._consumer.negative_acknowledge(message) |
| |
| def pause_message_listener(self): |
| """ |
| Pause receiving messages via the `message_listener` until |
| `resume_message_listener()` is called. |
| """ |
| self._consumer.pause_message_listener() |
| |
| def resume_message_listener(self): |
| """ |
| Resume receiving the messages via the message listener. |
| Asynchronously receive all the messages enqueued from the time |
| `pause_message_listener()` was called. |
| """ |
| self._consumer.resume_message_listener() |
| |
| def redeliver_unacknowledged_messages(self): |
| """ |
| Redelivers all the unacknowledged messages. In failover mode, the |
| request is ignored if the consumer is not active for the given topic. In |
| shared mode, the consumer's messages to be redelivered are distributed |
| across all the connected consumers. This is a non-blocking call and |
| doesn't throw an exception. In case the connection breaks, the messages |
| are redelivered after reconnect. |
| """ |
| self._consumer.redeliver_unacknowledged_messages() |
| |
| def seek(self, messageid): |
| """ |
| Reset the subscription associated with this consumer to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| |
| **Args** |
| |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._consumer.seek(messageid) |
| |
| def close(self): |
| """ |
| Close the consumer. |
| """ |
| self._consumer.close() |
| self._client._consumers.remove(self) |
| |
| def is_connected(self): |
| """ |
| Check if the consumer is connected or not. |
| """ |
| return self._consumer.is_connected() |
| |
| |
| |
| class Reader: |
| """ |
| Pulsar topic reader. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic this reader is reading from. |
| """ |
| return self._reader.topic() |
| |
| def read_next(self, timeout_millis=None): |
| """ |
| Read a single message. |
| |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| |
| **Options** |
| |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._reader.read_next() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._reader.read_next(timeout_millis) |
| |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| |
| def has_message_available(self): |
| """ |
| Check if there is any message available to read from the current position. |
| """ |
| return self._reader.has_message_available(); |
| |
| def seek(self, messageid): |
| """ |
| Reset this reader to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| |
| **Args** |
| |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._reader.seek(messageid) |
| |
| def close(self): |
| """ |
| Close the reader. |
| """ |
| self._reader.close() |
| self._client._consumers.remove(self) |
| |
| def is_connected(self): |
| """ |
| Check if the reader is connected or not. |
| """ |
| return self._reader.is_connected() |
| |
| |
| class CryptoKeyReader: |
| """ |
| Default crypto key reader implementation |
| """ |
| def __init__(self, public_key_path, private_key_path): |
| """ |
| Create crypto key reader. |
| |
| **Args** |
| |
| * `public_key_path`: Path to the public key |
| * `private_key_path`: Path to private key |
| """ |
| _check_type(str, public_key_path, 'public_key_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) |
| |
| def _check_type(var_type, var, name): |
| if not isinstance(var, var_type): |
| raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" |
| % (name, var_type.__name__, type(var).__name__)) |
| |
| |
| def _check_type_or_none(var_type, var, name): |
| if var is not None and not isinstance(var, var_type): |
| raise ValueError("Argument %s is expected to be either None or of type '%s'" |
| % (name, var_type.__name__)) |
| |
| |
| def _listener_wrapper(listener, schema): |
| def wrapper(consumer, msg): |
| c = Consumer() |
| c._consumer = consumer |
| m = Message() |
| m._message = msg |
| m._schema = schema |
| listener(c, m) |
| return wrapper |
| </code></pre> |
| </div> |
| |
| </header> |
| |
| <section id="section-items"> |
| |
| |
| <h2 class="section-title" id="header-classes">Classes</h2> |
| |
| <div class="item"> |
| <p id="pulsar.Authentication" class="name">class <span class="ident">Authentication</span></p> |
| |
| |
| <div class="desc"><p>Authentication provider object. Used to load authentication from an external |
| shared library.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Authentication" class="source"> |
| <pre><code>class Authentication: |
| """ |
| Authentication provider object. Used to load authentication from an external |
| shared library. |
| """ |
| def __init__(self, dynamicLibPath, authParamsString): |
| """ |
| Create the authentication provider instance. |
| |
| **Args** |
| |
| * `dynamicLibPath`: Path to the authentication provider shared library |
| (such as `tls.so`) |
| * `authParamsString`: Comma-separated list of provider-specific |
| configuration params |
| """ |
| _check_type(str, dynamicLibPath, 'dynamicLibPath') |
| _check_type(str, authParamsString, 'authParamsString') |
| self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Authentication">Authentication</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.Authentication.auth" class="name">var <span class="ident">auth</span></p> |
| |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Authentication.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, dynamicLibPath, authParamsString)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Create the authentication provider instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>dynamicLibPath</code>: Path to the authentication provider shared library |
| (such as <code>tls.so</code>)</li> |
| <li><code>authParamsString</code>: Comma-separated list of provider-specific |
| configuration params</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Authentication.__init__" class="source"> |
| <pre><code>def __init__(self, dynamicLibPath, authParamsString): |
| """ |
| Create the authentication provider instance. |
| **Args** |
| * `dynamicLibPath`: Path to the authentication provider shared library |
| (such as `tls.so`) |
| * `authParamsString`: Comma-separated list of provider-specific |
| configuration params |
| """ |
| _check_type(str, dynamicLibPath, 'dynamicLibPath') |
| _check_type(str, authParamsString, 'authParamsString') |
| self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.AuthenticationAthenz" class="name">class <span class="ident">AuthenticationAthenz</span></p> |
| |
| |
| <div class="desc"><p>Athenz Authentication implementation</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationAthenz" class="source"> |
| <pre><code>class AuthenticationAthenz(Authentication): |
| """ |
| Athenz Authentication implementation |
| """ |
| def __init__(self, auth_params_string): |
| """ |
| Create the Athenz authentication provider instance. |
| |
| **Args** |
| |
| * `auth_params_string`: JSON encoded configuration for Athenz client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationAthenz(auth_params_string) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></li> |
| <li><a href="#pulsar.Authentication">Authentication</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.AuthenticationAthenz.auth" class="name">var <span class="ident">auth</span></p> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code> |
| </p> |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.AuthenticationAthenz.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p> |
| </div> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code> |
| </p> |
| |
| |
| |
| <div class="desc"><p>Create the Athenz authentication provider instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>auth_params_string</code>: JSON encoded configuration for Athenz client</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationAthenz.__init__" class="source"> |
| <pre><code>def __init__(self, auth_params_string): |
| """ |
| Create the Athenz authentication provider instance. |
| **Args** |
| * `auth_params_string`: JSON encoded configuration for Athenz client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationAthenz(auth_params_string) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.AuthenticationOauth2" class="name">class <span class="ident">AuthenticationOauth2</span></p> |
| |
| |
| <div class="desc"><p>Oauth2 Authentication implementation</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationOauth2" class="source"> |
| <pre><code>class AuthenticationOauth2(Authentication): |
| """ |
| Oauth2 Authentication implementation |
| """ |
| def __init__(self, auth_params_string): |
| """ |
| Create the Oauth2 authentication provider instance. |
| |
| **Args** |
| |
| * `auth_params_string`: JSON encoded configuration for Oauth2 client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationOauth2(auth_params_string) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></li> |
| <li><a href="#pulsar.Authentication">Authentication</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.AuthenticationOauth2.auth" class="name">var <span class="ident">auth</span></p> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code> |
| </p> |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.AuthenticationOauth2.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p> |
| </div> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code> |
| </p> |
| |
| |
| |
| <div class="desc"><p>Create the Oauth2 authentication provider instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>auth_params_string</code>: JSON encoded configuration for Oauth2 client</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationOauth2.__init__" class="source"> |
| <pre><code>def __init__(self, auth_params_string): |
| """ |
| Create the Oauth2 authentication provider instance. |
| **Args** |
| * `auth_params_string`: JSON encoded configuration for Oauth2 client |
| """ |
| _check_type(str, auth_params_string, 'auth_params_string') |
| self.auth = _pulsar.AuthenticationOauth2(auth_params_string) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.AuthenticationTLS" class="name">class <span class="ident">AuthenticationTLS</span></p> |
| |
| |
| <div class="desc"><p>TLS Authentication implementation</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationTLS" class="source"> |
| <pre><code>class AuthenticationTLS(Authentication): |
| """ |
| TLS Authentication implementation |
| """ |
| def __init__(self, certificate_path, private_key_path): |
| """ |
| Create the TLS authentication provider instance. |
| |
| **Args** |
| |
| * `certificatePath`: Path to the public certificate |
| * `privateKeyPath`: Path to private TLS key |
| """ |
| _check_type(str, certificate_path, 'certificate_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></li> |
| <li><a href="#pulsar.Authentication">Authentication</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.AuthenticationTLS.auth" class="name">var <span class="ident">auth</span></p> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code> |
| </p> |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.AuthenticationTLS.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, certificate_path, private_key_path)</p> |
| </div> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code> |
| </p> |
| |
| |
| |
| <div class="desc"><p>Create the TLS authentication provider instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>certificatePath</code>: Path to the public certificate</li> |
| <li><code>privateKeyPath</code>: Path to private TLS key</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationTLS.__init__" class="source"> |
| <pre><code>def __init__(self, certificate_path, private_key_path): |
| """ |
| Create the TLS authentication provider instance. |
| **Args** |
| * `certificatePath`: Path to the public certificate |
| * `privateKeyPath`: Path to private TLS key |
| """ |
| _check_type(str, certificate_path, 'certificate_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.AuthenticationToken" class="name">class <span class="ident">AuthenticationToken</span></p> |
| |
| |
| <div class="desc"><p>Token based authentication implementation</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationToken" class="source"> |
| <pre><code>class AuthenticationToken(Authentication): |
| """ |
| Token based authentication implementation |
| """ |
| def __init__(self, token): |
| """ |
| Create the token authentication provider instance. |
| |
| **Args** |
| |
| * `token`: A string containing the token or a functions that provides a |
| string with the token |
| """ |
| if not (isinstance(token, str) or callable(token)): |
| raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") |
| self.auth = _pulsar.AuthenticationToken(token) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></li> |
| <li><a href="#pulsar.Authentication">Authentication</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.AuthenticationToken.auth" class="name">var <span class="ident">auth</span></p> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code> |
| </p> |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.AuthenticationToken.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, token)</p> |
| </div> |
| |
| <p class="inheritance"> |
| <strong>Inheritance:</strong> |
| <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code> |
| </p> |
| |
| |
| |
| <div class="desc"><p>Create the token authentication provider instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>token</code>: A string containing the token or a functions that provides a |
| string with the token</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.AuthenticationToken.__init__" class="source"> |
| <pre><code>def __init__(self, token): |
| """ |
| Create the token authentication provider instance. |
| **Args** |
| * `token`: A string containing the token or a functions that provides a |
| string with the token |
| """ |
| if not (isinstance(token, str) or callable(token)): |
| raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") |
| self.auth = _pulsar.AuthenticationToken(token) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.Client" class="name">class <span class="ident">Client</span></p> |
| |
| |
| <div class="desc"><p>The Pulsar client. A single client instance can be used to create producers |
| and consumers on multiple topics.</p> |
| <p>The client will share the same connection pool and threads across all |
| producers and consumers.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client" class="source"> |
| <pre><code>class Client: |
| """ |
| The Pulsar client. A single client instance can be used to create producers |
| and consumers on multiple topics. |
| |
| The client will share the same connection pool and threads across all |
| producers and consumers. |
| """ |
| |
| def __init__(self, service_url, |
| authentication=None, |
| operation_timeout_seconds=30, |
| io_threads=1, |
| message_listener_threads=1, |
| concurrent_lookup_requests=50000, |
| log_conf_file_path=None, |
| use_tls=False, |
| tls_trust_certs_file_path=None, |
| tls_allow_insecure_connection=False, |
| tls_validate_hostname=False, |
| logger=None, |
| connection_timeout_ms=10000, |
| ): |
| """ |
| Create a new Pulsar client instance. |
| |
| **Args** |
| |
| * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ |
| |
| **Options** |
| |
| * `authentication`: |
| Set the authentication provider to be used with the broker. For example: |
| `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` |
| * `operation_timeout_seconds`: |
| Set timeout on client operations (subscribe, create producer, close, |
| unsubscribe). |
| * `io_threads`: |
| Set the number of IO threads to be used by the Pulsar client. |
| * `message_listener_threads`: |
| Set the number of threads to be used by the Pulsar client when |
| delivering messages through message listener. The default is 1 thread |
| per Pulsar client. If using more than 1 thread, messages for distinct |
| `message_listener`s will be delivered in different threads, however a |
| single `MessageListener` will always be assigned to the same thread. |
| * `concurrent_lookup_requests`: |
| Number of concurrent lookup-requests allowed on each broker connection |
| to prevent overload on the broker. |
| * `log_conf_file_path`: |
| Initialize log4cxx from a configuration file. |
| * `use_tls`: |
| Configure whether to use TLS encryption on the connection. This setting |
| is deprecated. TLS will be automatically enabled if the `serviceUrl` is |
| set to `pulsar+ssl://` or `https://` |
| * `tls_trust_certs_file_path`: |
| Set the path to the trusted TLS certificate file. If empty defaults to |
| certifi. |
| * `tls_allow_insecure_connection`: |
| Configure whether the Pulsar client accepts untrusted TLS certificates |
| from the broker. |
| * `tls_validate_hostname`: |
| Configure whether the Pulsar client validates that the hostname of the |
| endpoint, matches the common name on the TLS certificate presented by |
| the endpoint. |
| * `logger`: |
| Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. |
| * `connection_timeout_ms`: |
| Set timeout in milliseconds on TCP connections. |
| """ |
| _check_type(str, service_url, 'service_url') |
| _check_type_or_none(Authentication, authentication, 'authentication') |
| _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') |
| _check_type(int, connection_timeout_ms, 'connection_timeout_ms') |
| _check_type(int, io_threads, 'io_threads') |
| _check_type(int, message_listener_threads, 'message_listener_threads') |
| _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') |
| _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') |
| _check_type(bool, use_tls, 'use_tls') |
| _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') |
| _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') |
| _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') |
| _check_type_or_none(logging.Logger, logger, 'logger') |
| |
| conf = _pulsar.ClientConfiguration() |
| if authentication: |
| conf.authentication(authentication.auth) |
| conf.operation_timeout_seconds(operation_timeout_seconds) |
| conf.connection_timeout(connection_timeout_ms) |
| conf.io_threads(io_threads) |
| conf.message_listener_threads(message_listener_threads) |
| conf.concurrent_lookup_requests(concurrent_lookup_requests) |
| if log_conf_file_path: |
| conf.log_conf_file_path(log_conf_file_path) |
| if logger: |
| conf.set_logger(logger) |
| if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): |
| conf.use_tls(True) |
| if tls_trust_certs_file_path: |
| conf.tls_trust_certs_file_path(tls_trust_certs_file_path) |
| else: |
| conf.tls_trust_certs_file_path(certifi.where()) |
| conf.tls_allow_insecure_connection(tls_allow_insecure_connection) |
| conf.tls_validate_hostname(tls_validate_hostname) |
| self._client = _pulsar.Client(service_url, conf) |
| self._consumers = [] |
| |
| def create_producer(self, topic, |
| producer_name=None, |
| schema=schema.BytesSchema(), |
| initial_sequence_id=None, |
| send_timeout_millis=30000, |
| compression_type=CompressionType.NONE, |
| max_pending_messages=1000, |
| max_pending_messages_across_partitions=50000, |
| block_if_queue_full=False, |
| batching_enabled=False, |
| batching_max_messages=1000, |
| batching_max_allowed_size_in_bytes=128*1024, |
| batching_max_publish_delay_ms=10, |
| message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, |
| lazy_start_partitioned_producers=False, |
| properties=None, |
| batching_type=BatchingType.Default, |
| encryption_key=None, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a new producer on a given topic. |
| |
| **Args** |
| |
| * `topic`: |
| The topic name |
| |
| **Options** |
| |
| * `producer_name`: |
| Specify a name for the producer. If not assigned, |
| the system will generate a globally unique name which can be accessed |
| with `Producer.producer_name()`. When specifying a name, it is app to |
| the user to ensure that, for a given topic, the producer name is unique |
| across all Pulsar's clusters. |
| * `schema`: |
| Define the schema of the data that will be published by this producer. |
| The schema will be used for two purposes: |
| - Validate the data format against the topic defined schema |
| - Perform serialization/deserialization between data and objects |
| An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. |
| * `initial_sequence_id`: |
| Set the baseline for the sequence ids for messages |
| published by the producer. First message will be using |
| `(initialSequenceId + 1)`` as its sequence id and subsequent messages will |
| be assigned incremental sequence ids, if not otherwise specified. |
| * `send_timeout_millis`: |
| If a message is not acknowledged by the server before the |
| `send_timeout` expires, an error will be reported. |
| * `compression_type`: |
| Set the compression type for the producer. By default, message |
| payloads are not compressed. Supported compression types are |
| `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. |
| ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with ZSTD. |
| SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with SNAPPY. |
| * `max_pending_messages`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment from the broker. |
| * `max_pending_messages_across_partitions`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment across partitions from the broker. |
| * `block_if_queue_full`: Set whether `send_async` operations should |
| block when the outgoing message queue is full. |
| * `message_routing_mode`: |
| Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, |
| other option is `PartitionsRoutingMode.UseSinglePartition` |
| * `lazy_start_partitioned_producers`: |
| This config affects producers of partitioned topics only. It controls whether |
| producers register and connect immediately to the owner broker of each partition |
| or start lazily on demand. The internal producer of one partition is always |
| started eagerly, chosen by the routing policy, but the internal producers of |
| any additional partitions are started on demand, upon receiving their first |
| message. |
| Using this mode can reduce the strain on brokers for topics with large numbers of |
| partitions and when the SinglePartition routing policy is used without keyed messages. |
| Because producer connection can be on demand, this can produce extra send latency |
| for the first messages of a given partition. |
| * `properties`: |
| Sets the properties for the producer. The properties associated with a producer |
| can be used for identify a producer at broker side. |
| * `batching_type`: |
| Sets the batching type for the producer. |
| There are two batching type: DefaultBatching and KeyBasedBatching. |
| - Default batching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] |
| |
| - KeyBasedBatching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] |
| * encryption_key: |
| The key used for symmetric encryption, configured on the producer side |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type_or_none(str, producer_name, 'producer_name') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') |
| _check_type(int, send_timeout_millis, 'send_timeout_millis') |
| _check_type(CompressionType, compression_type, 'compression_type') |
| _check_type(int, max_pending_messages, 'max_pending_messages') |
| _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') |
| _check_type(bool, block_if_queue_full, 'block_if_queue_full') |
| _check_type(bool, batching_enabled, 'batching_enabled') |
| _check_type(int, batching_max_messages, 'batching_max_messages') |
| _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') |
| _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(BatchingType, batching_type, 'batching_type') |
| _check_type_or_none(str, encryption_key, 'encryption_key') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') |
| |
| conf = _pulsar.ProducerConfiguration() |
| conf.send_timeout_millis(send_timeout_millis) |
| conf.compression_type(compression_type) |
| conf.max_pending_messages(max_pending_messages) |
| conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) |
| conf.block_if_queue_full(block_if_queue_full) |
| conf.batching_enabled(batching_enabled) |
| conf.batching_max_messages(batching_max_messages) |
| conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) |
| conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) |
| conf.partitions_routing_mode(message_routing_mode) |
| conf.batching_type(batching_type) |
| conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) |
| if producer_name: |
| conf.producer_name(producer_name) |
| if initial_sequence_id: |
| conf.initial_sequence_id(initial_sequence_id) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| |
| conf.schema(schema.schema_info()) |
| if encryption_key: |
| conf.encryption_key(encryption_key) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| p = Producer() |
| p._producer = self._client.create_producer(topic, conf) |
| p._schema = schema |
| p._client = self._client |
| return p |
| |
| def subscribe(self, topic, subscription_name, |
| consumer_type=ConsumerType.Exclusive, |
| schema=schema.BytesSchema(), |
| message_listener=None, |
| receiver_queue_size=1000, |
| max_total_receiver_queue_size_across_partitions=50000, |
| consumer_name=None, |
| unacked_messages_timeout_ms=None, |
| broker_consumer_stats_cache_time_ms=30000, |
| negative_ack_redelivery_delay_ms=60000, |
| is_read_compacted=False, |
| properties=None, |
| pattern_auto_discovery_period=60, |
| initial_position=InitialPosition.Latest, |
| crypto_key_reader=None, |
| replicate_subscription_state_enabled=False |
| ): |
| """ |
| Subscribe to the given topic and subscription combination. |
| |
| **Args** |
| |
| * `topic`: The name of the topic, list of topics or regex pattern. |
| This method will accept these forms: |
| - `topic='my-topic'` |
| - `topic=['topic-1', 'topic-2', 'topic-3']` |
| - `topic=re.compile('persistent://public/default/topic-*')` |
| * `subscription`: The name of the subscription. |
| |
| **Options** |
| |
| * `consumer_type`: |
| Select the subscription type to be used when subscribing to the topic. |
| * `schema`: |
| Define the schema of the data that will be received by this consumer. |
| * `message_listener`: |
| Sets a message listener for the consumer. When the listener is set, |
| the application will receive messages through it. Calls to |
| `consumer.receive()` will not be allowed. The listener function needs |
| to accept (consumer, message), for example: |
| |
| #!python |
| def my_listener(consumer, message): |
| # process message |
| consumer.acknowledge(message) |
| |
| * `receiver_queue_size`: |
| Sets the size of the consumer receive queue. The consumer receive |
| queue controls how many messages can be accumulated by the consumer |
| before the application calls `receive()`. Using a higher value could |
| potentially increase the consumer throughput at the expense of higher |
| memory utilization. Setting the consumer queue size to zero decreases |
| the throughput of the consumer by disabling pre-fetching of messages. |
| This approach improves the message distribution on shared subscription |
| by pushing messages only to those consumers that are ready to process |
| them. Neither receive with timeout nor partitioned topics can be used |
| if the consumer queue size is zero. The `receive()` function call |
| should not be interrupted when the consumer queue size is zero. The |
| default value is 1000 messages and should work well for most use |
| cases. |
| * `max_total_receiver_queue_size_across_partitions` |
| Set the max total receiver queue size across partitions. |
| This setting will be used to reduce the receiver queue size for individual partitions |
| * `consumer_name`: |
| Sets the consumer name. |
| * `unacked_messages_timeout_ms`: |
| Sets the timeout in milliseconds for unacknowledged messages. The |
| timeout needs to be greater than 10 seconds. An exception is thrown if |
| the given value is less than 10 seconds. If a successful |
| acknowledgement is not sent within the timeout, all the unacknowledged |
| messages are redelivered. |
| * `negative_ack_redelivery_delay_ms`: |
| The delay after which to redeliver the messages that failed to be |
| processed (with the `consumer.negative_acknowledge()`) |
| * `broker_consumer_stats_cache_time_ms`: |
| Sets the time duration for which the broker-side consumer stats will |
| be cached in the client. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * `properties`: |
| Sets the properties for the consumer. The properties associated with a consumer |
| can be used for identify a consumer at broker side. |
| * `pattern_auto_discovery_period`: |
| Periods of seconds for consumer to auto discover match topics. |
| * `initial_position`: |
| Set the initial position of a consumer when subscribing to the topic. |
| It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. |
| Default: `Latest`. |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| * replicate_subscription_state_enabled: |
| Set whether the subscription status should be replicated. |
| Default: `False`. |
| """ |
| _check_type(str, subscription_name, 'subscription_name') |
| _check_type(ConsumerType, consumer_type, 'consumer_type') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type(int, max_total_receiver_queue_size_across_partitions, |
| 'max_total_receiver_queue_size_across_partitions') |
| _check_type_or_none(str, consumer_name, 'consumer_name') |
| _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') |
| _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') |
| _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') |
| _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(InitialPosition, initial_position, 'initial_position') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| |
| conf = _pulsar.ConsumerConfiguration() |
| conf.consumer_type(consumer_type) |
| conf.read_compacted(is_read_compacted) |
| if message_listener: |
| conf.message_listener(_listener_wrapper(message_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) |
| if consumer_name: |
| conf.consumer_name(consumer_name) |
| if unacked_messages_timeout_ms: |
| conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) |
| |
| conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) |
| conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| conf.subscription_initial_position(initial_position) |
| |
| conf.schema(schema.schema_info()) |
| |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) |
| |
| c = Consumer() |
| if isinstance(topic, str): |
| # Single topic |
| c._consumer = self._client.subscribe(topic, subscription_name, conf) |
| elif isinstance(topic, list): |
| # List of topics |
| c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) |
| elif isinstance(topic, _retype): |
| # Regex pattern |
| c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) |
| else: |
| raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") |
| |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| |
| def create_reader(self, topic, start_message_id, |
| schema=schema.BytesSchema(), |
| reader_listener=None, |
| receiver_queue_size=1000, |
| reader_name=None, |
| subscription_role_prefix=None, |
| is_read_compacted=False, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a reader on a particular topic |
| |
| **Args** |
| |
| * `topic`: The name of the topic. |
| * `start_message_id`: The initial reader positioning is done by specifying a message id. |
| The options are: |
| * `MessageId.earliest`: Start reading from the earliest message available in the topic |
| * `MessageId.latest`: Start reading from the end topic, only getting messages published |
| after the reader was created |
| * `MessageId`: When passing a particular message id, the reader will position itself on |
| that specific position. The first message to be read will be the message next to the |
| specified messageId. Message id can be serialized into a string and deserialized |
| back into a `MessageId` object: |
| |
| # Serialize to string |
| s = msg.message_id().serialize() |
| |
| # Deserialize from string |
| msg_id = MessageId.deserialize(s) |
| |
| **Options** |
| |
| * `schema`: |
| Define the schema of the data that will be received by this reader. |
| * `reader_listener`: |
| Sets a message listener for the reader. When the listener is set, |
| the application will receive messages through it. Calls to |
| `reader.read_next()` will not be allowed. The listener function needs |
| to accept (reader, message), for example: |
| |
| def my_listener(reader, message): |
| # process message |
| pass |
| |
| * `receiver_queue_size`: |
| Sets the size of the reader receive queue. The reader receive |
| queue controls how many messages can be accumulated by the reader |
| before the application calls `read_next()`. Using a higher value could |
| potentially increase the reader throughput at the expense of higher |
| memory utilization. |
| * `reader_name`: |
| Sets the reader name. |
| * `subscription_role_prefix`: |
| Sets the subscription role prefix. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type_or_none(str, reader_name, 'reader_name') |
| _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| |
| conf = _pulsar.ReaderConfiguration() |
| if reader_listener: |
| conf.reader_listener(_listener_wrapper(reader_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| if reader_name: |
| conf.reader_name(reader_name) |
| if subscription_role_prefix: |
| conf.subscription_role_prefix(subscription_role_prefix) |
| conf.schema(schema.schema_info()) |
| conf.read_compacted(is_read_compacted) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| |
| c = Reader() |
| c._reader = self._client.create_reader(topic, start_message_id, conf) |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| |
| def get_topic_partitions(self, topic): |
| """ |
| Get the list of partitions for a given topic. |
| |
| If the topic is partitioned, this will return a list of partition names. If the topic is not |
| partitioned, the returned list will contain the topic name itself. |
| |
| This can be used to discover the partitions and create Reader, Consumer or Producer |
| instances directly on a particular partition. |
| :param topic: the topic name to lookup |
| :return: a list of partition name |
| """ |
| _check_type(str, topic, 'topic') |
| return self._client.get_topic_partitions(topic) |
| |
| def shutdown(self): |
| """ |
| Perform immediate shutdown of Pulsar client. |
| |
| Release all resources and close all producer, consumer, and readers without waiting |
| for ongoing operations to complete. |
| """ |
| self._client.shutdown() |
| |
| def close(self): |
| """ |
| Close the client and all the associated producers and consumers |
| """ |
| self._client.close() |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Client">Client</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False, logger=None, connection_timeout_ms=10000)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Create a new Pulsar client instance.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>service_url</code>: The Pulsar service url eg: pulsar://my-broker.com:6650/</li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>authentication</code>: |
| Set the authentication provider to be used with the broker. For example: |
| <code>AuthenticationTls</code>, AuthenticaionToken, <code>AuthenticationAthenz</code>or <code>AuthenticationOauth2</code></li> |
| <li><code>operation_timeout_seconds</code>: |
| Set timeout on client operations (subscribe, create producer, close, |
| unsubscribe).</li> |
| <li><code>io_threads</code>: |
| Set the number of IO threads to be used by the Pulsar client.</li> |
| <li><code>message_listener_threads</code>: |
| Set the number of threads to be used by the Pulsar client when |
| delivering messages through message listener. The default is 1 thread |
| per Pulsar client. If using more than 1 thread, messages for distinct |
| <code>message_listener</code>s will be delivered in different threads, however a |
| single <code>MessageListener</code> will always be assigned to the same thread.</li> |
| <li><code>concurrent_lookup_requests</code>: |
| Number of concurrent lookup-requests allowed on each broker connection |
| to prevent overload on the broker.</li> |
| <li><code>log_conf_file_path</code>: |
| Initialize log4cxx from a configuration file.</li> |
| <li><code>use_tls</code>: |
| Configure whether to use TLS encryption on the connection. This setting |
| is deprecated. TLS will be automatically enabled if the <code>serviceUrl</code> is |
| set to <code>pulsar+ssl://</code> or <code>https://</code></li> |
| <li><code>tls_trust_certs_file_path</code>: |
| Set the path to the trusted TLS certificate file. If empty defaults to |
| certifi.</li> |
| <li><code>tls_allow_insecure_connection</code>: |
| Configure whether the Pulsar client accepts untrusted TLS certificates |
| from the broker.</li> |
| <li><code>tls_validate_hostname</code>: |
| Configure whether the Pulsar client validates that the hostname of the |
| endpoint, matches the common name on the TLS certificate presented by |
| the endpoint.</li> |
| <li><code>logger</code>: |
| Set a Python logger for this Pulsar client. Should be an instance of <code>logging.Logger</code>.</li> |
| <li><code>connection_timeout_ms</code>: |
| Set timeout in milliseconds on TCP connections.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.__init__" class="source"> |
| <pre><code>def __init__(self, service_url, |
| authentication=None, |
| operation_timeout_seconds=30, |
| io_threads=1, |
| message_listener_threads=1, |
| concurrent_lookup_requests=50000, |
| log_conf_file_path=None, |
| use_tls=False, |
| tls_trust_certs_file_path=None, |
| tls_allow_insecure_connection=False, |
| tls_validate_hostname=False, |
| logger=None, |
| connection_timeout_ms=10000, |
| ): |
| """ |
| Create a new Pulsar client instance. |
| **Args** |
| * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/ |
| **Options** |
| * `authentication`: |
| Set the authentication provider to be used with the broker. For example: |
| `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2` |
| * `operation_timeout_seconds`: |
| Set timeout on client operations (subscribe, create producer, close, |
| unsubscribe). |
| * `io_threads`: |
| Set the number of IO threads to be used by the Pulsar client. |
| * `message_listener_threads`: |
| Set the number of threads to be used by the Pulsar client when |
| delivering messages through message listener. The default is 1 thread |
| per Pulsar client. If using more than 1 thread, messages for distinct |
| `message_listener`s will be delivered in different threads, however a |
| single `MessageListener` will always be assigned to the same thread. |
| * `concurrent_lookup_requests`: |
| Number of concurrent lookup-requests allowed on each broker connection |
| to prevent overload on the broker. |
| * `log_conf_file_path`: |
| Initialize log4cxx from a configuration file. |
| * `use_tls`: |
| Configure whether to use TLS encryption on the connection. This setting |
| is deprecated. TLS will be automatically enabled if the `serviceUrl` is |
| set to `pulsar+ssl://` or `https://` |
| * `tls_trust_certs_file_path`: |
| Set the path to the trusted TLS certificate file. If empty defaults to |
| certifi. |
| * `tls_allow_insecure_connection`: |
| Configure whether the Pulsar client accepts untrusted TLS certificates |
| from the broker. |
| * `tls_validate_hostname`: |
| Configure whether the Pulsar client validates that the hostname of the |
| endpoint, matches the common name on the TLS certificate presented by |
| the endpoint. |
| * `logger`: |
| Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`. |
| * `connection_timeout_ms`: |
| Set timeout in milliseconds on TCP connections. |
| """ |
| _check_type(str, service_url, 'service_url') |
| _check_type_or_none(Authentication, authentication, 'authentication') |
| _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') |
| _check_type(int, connection_timeout_ms, 'connection_timeout_ms') |
| _check_type(int, io_threads, 'io_threads') |
| _check_type(int, message_listener_threads, 'message_listener_threads') |
| _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') |
| _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') |
| _check_type(bool, use_tls, 'use_tls') |
| _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') |
| _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') |
| _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') |
| _check_type_or_none(logging.Logger, logger, 'logger') |
| conf = _pulsar.ClientConfiguration() |
| if authentication: |
| conf.authentication(authentication.auth) |
| conf.operation_timeout_seconds(operation_timeout_seconds) |
| conf.connection_timeout(connection_timeout_ms) |
| conf.io_threads(io_threads) |
| conf.message_listener_threads(message_listener_threads) |
| conf.concurrent_lookup_requests(concurrent_lookup_requests) |
| if log_conf_file_path: |
| conf.log_conf_file_path(log_conf_file_path) |
| if logger: |
| conf.set_logger(logger) |
| if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): |
| conf.use_tls(True) |
| if tls_trust_certs_file_path: |
| conf.tls_trust_certs_file_path(tls_trust_certs_file_path) |
| else: |
| conf.tls_trust_certs_file_path(certifi.where()) |
| conf.tls_allow_insecure_connection(tls_allow_insecure_connection) |
| conf.tls_validate_hostname(tls_validate_hostname) |
| self._client = _pulsar.Client(service_url, conf) |
| self._consumers = [] |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.close"> |
| <p>def <span class="ident">close</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Close the client and all the associated producers and consumers</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.close', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.close" class="source"> |
| <pre><code>def close(self): |
| """ |
| Close the client and all the associated producers and consumers |
| """ |
| self._client.close() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.create_producer"> |
| <p>def <span class="ident">create_producer</span>(</p><p>self, topic, producer_name=None, schema=<pulsar.schema.schema.BytesSchema object at 0x7fbbd5c4ee50>, initial_sequence_id=None, send_timeout_millis=30000, compression_type=_pulsar.CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms=10, message_routing_mode=_pulsar.PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, batching_type=_pulsar.BatchingType.Default, encryption_key=None, crypto_key_reader=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Create a new producer on a given topic.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>topic</code>: |
| The topic name</li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>producer_name</code>: |
| Specify a name for the producer. If not assigned, |
| the system will generate a globally unique name which can be accessed |
| with <code>Producer.producer_name()</code>. When specifying a name, it is app to |
| the user to ensure that, for a given topic, the producer name is unique |
| across all Pulsar's clusters.</li> |
| <li><code>schema</code>: |
| Define the schema of the data that will be published by this producer. |
| The schema will be used for two purposes:<ul> |
| <li>Validate the data format against the topic defined schema</li> |
| <li>Perform serialization/deserialization between data and objects |
| An example for this parameter would be to pass <code>schema=JsonSchema(MyRecordClass)</code>.</li> |
| </ul> |
| </li> |
| <li><code>initial_sequence_id</code>: |
| Set the baseline for the sequence ids for messages |
| published by the producer. First message will be using |
| `(initialSequenceId + 1)`` as its sequence id and subsequent messages will |
| be assigned incremental sequence ids, if not otherwise specified.</li> |
| <li><code>send_timeout_millis</code>: |
| If a message is not acknowledged by the server before the |
| <code>send_timeout</code> expires, an error will be reported.</li> |
| <li><code>compression_type</code>: |
| Set the compression type for the producer. By default, message |
| payloads are not compressed. Supported compression types are |
| <code>CompressionType.LZ4</code>, <code>CompressionType.ZLib</code>, <code>CompressionType.ZSTD</code> and <code>CompressionType.SNAPPY</code>. |
| ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with ZSTD. |
| SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with SNAPPY.</li> |
| <li><code>max_pending_messages</code>: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment from the broker.</li> |
| <li><code>max_pending_messages_across_partitions</code>: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment across partitions from the broker.</li> |
| <li><code>block_if_queue_full</code>: Set whether <code>send_async</code> operations should |
| block when the outgoing message queue is full.</li> |
| <li><code>message_routing_mode</code>: |
| Set the message routing mode for the partitioned producer. Default is <code>PartitionsRoutingMode.RoundRobinDistribution</code>, |
| other option is <code>PartitionsRoutingMode.UseSinglePartition</code></li> |
| <li><code>lazy_start_partitioned_producers</code>: |
| This config affects producers of partitioned topics only. It controls whether |
| producers register and connect immediately to the owner broker of each partition |
| or start lazily on demand. The internal producer of one partition is always |
| started eagerly, chosen by the routing policy, but the internal producers of |
| any additional partitions are started on demand, upon receiving their first |
| message. |
| Using this mode can reduce the strain on brokers for topics with large numbers of |
| partitions and when the SinglePartition routing policy is used without keyed messages. |
| Because producer connection can be on demand, this can produce extra send latency |
| for the first messages of a given partition.</li> |
| <li><code>properties</code>: |
| Sets the properties for the producer. The properties associated with a producer |
| can be used for identify a producer at broker side.</li> |
| <li> |
| <p><code>batching_type</code>: |
| Sets the batching type for the producer. |
| There are two batching type: DefaultBatching and KeyBasedBatching.</p> |
| <ul> |
| <li> |
| <p>Default batching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]</p> |
| </li> |
| <li> |
| <p>KeyBasedBatching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]</p> |
| </li> |
| <li>encryption_key: |
| The key used for symmetric encryption, configured on the producer side</li> |
| <li>crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer</li> |
| </ul> |
| </li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_producer', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.create_producer" class="source"> |
| <pre><code>def create_producer(self, topic, |
| producer_name=None, |
| schema=schema.BytesSchema(), |
| initial_sequence_id=None, |
| send_timeout_millis=30000, |
| compression_type=CompressionType.NONE, |
| max_pending_messages=1000, |
| max_pending_messages_across_partitions=50000, |
| block_if_queue_full=False, |
| batching_enabled=False, |
| batching_max_messages=1000, |
| batching_max_allowed_size_in_bytes=128*1024, |
| batching_max_publish_delay_ms=10, |
| message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, |
| lazy_start_partitioned_producers=False, |
| properties=None, |
| batching_type=BatchingType.Default, |
| encryption_key=None, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a new producer on a given topic. |
| **Args** |
| * `topic`: |
| The topic name |
| **Options** |
| * `producer_name`: |
| Specify a name for the producer. If not assigned, |
| the system will generate a globally unique name which can be accessed |
| with `Producer.producer_name()`. When specifying a name, it is app to |
| the user to ensure that, for a given topic, the producer name is unique |
| across all Pulsar's clusters. |
| * `schema`: |
| Define the schema of the data that will be published by this producer. |
| The schema will be used for two purposes: |
| - Validate the data format against the topic defined schema |
| - Perform serialization/deserialization between data and objects |
| An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`. |
| * `initial_sequence_id`: |
| Set the baseline for the sequence ids for messages |
| published by the producer. First message will be using |
| `(initialSequenceId + 1)`` as its sequence id and subsequent messages will |
| be assigned incremental sequence ids, if not otherwise specified. |
| * `send_timeout_millis`: |
| If a message is not acknowledged by the server before the |
| `send_timeout` expires, an error will be reported. |
| * `compression_type`: |
| Set the compression type for the producer. By default, message |
| payloads are not compressed. Supported compression types are |
| `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`. |
| ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with ZSTD. |
| SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that |
| release in order to be able to receive messages compressed with SNAPPY. |
| * `max_pending_messages`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment from the broker. |
| * `max_pending_messages_across_partitions`: |
| Set the max size of the queue holding the messages pending to receive |
| an acknowledgment across partitions from the broker. |
| * `block_if_queue_full`: Set whether `send_async` operations should |
| block when the outgoing message queue is full. |
| * `message_routing_mode`: |
| Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, |
| other option is `PartitionsRoutingMode.UseSinglePartition` |
| * `lazy_start_partitioned_producers`: |
| This config affects producers of partitioned topics only. It controls whether |
| producers register and connect immediately to the owner broker of each partition |
| or start lazily on demand. The internal producer of one partition is always |
| started eagerly, chosen by the routing policy, but the internal producers of |
| any additional partitions are started on demand, upon receiving their first |
| message. |
| Using this mode can reduce the strain on brokers for topics with large numbers of |
| partitions and when the SinglePartition routing policy is used without keyed messages. |
| Because producer connection can be on demand, this can produce extra send latency |
| for the first messages of a given partition. |
| * `properties`: |
| Sets the properties for the producer. The properties associated with a producer |
| can be used for identify a producer at broker side. |
| * `batching_type`: |
| Sets the batching type for the producer. |
| There are two batching type: DefaultBatching and KeyBasedBatching. |
| - Default batching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] |
| - KeyBasedBatching |
| incoming single messages: |
| (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) |
| batched into single batch message: |
| [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] |
| * encryption_key: |
| The key used for symmetric encryption, configured on the producer side |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type_or_none(str, producer_name, 'producer_name') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') |
| _check_type(int, send_timeout_millis, 'send_timeout_millis') |
| _check_type(CompressionType, compression_type, 'compression_type') |
| _check_type(int, max_pending_messages, 'max_pending_messages') |
| _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') |
| _check_type(bool, block_if_queue_full, 'block_if_queue_full') |
| _check_type(bool, batching_enabled, 'batching_enabled') |
| _check_type(int, batching_max_messages, 'batching_max_messages') |
| _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') |
| _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(BatchingType, batching_type, 'batching_type') |
| _check_type_or_none(str, encryption_key, 'encryption_key') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') |
| conf = _pulsar.ProducerConfiguration() |
| conf.send_timeout_millis(send_timeout_millis) |
| conf.compression_type(compression_type) |
| conf.max_pending_messages(max_pending_messages) |
| conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) |
| conf.block_if_queue_full(block_if_queue_full) |
| conf.batching_enabled(batching_enabled) |
| conf.batching_max_messages(batching_max_messages) |
| conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) |
| conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) |
| conf.partitions_routing_mode(message_routing_mode) |
| conf.batching_type(batching_type) |
| conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) |
| if producer_name: |
| conf.producer_name(producer_name) |
| if initial_sequence_id: |
| conf.initial_sequence_id(initial_sequence_id) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| conf.schema(schema.schema_info()) |
| if encryption_key: |
| conf.encryption_key(encryption_key) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| p = Producer() |
| p._producer = self._client.create_producer(topic, conf) |
| p._schema = schema |
| p._client = self._client |
| return p |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.create_reader"> |
| <p>def <span class="ident">create_reader</span>(</p><p>self, topic, start_message_id, schema=<pulsar.schema.schema.BytesSchema object at 0x7fbbce1c1350>, reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, crypto_key_reader=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Create a reader on a particular topic</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>topic</code>: The name of the topic.</li> |
| <li><code>start_message_id</code>: The initial reader positioning is done by specifying a message id. |
| The options are:<ul> |
| <li><code>MessageId.earliest</code>: Start reading from the earliest message available in the topic</li> |
| <li><code>MessageId.latest</code>: Start reading from the end topic, only getting messages published |
| after the reader was created</li> |
| <li> |
| <p><code>MessageId</code>: When passing a particular message id, the reader will position itself on |
| that specific position. The first message to be read will be the message next to the |
| specified messageId. Message id can be serialized into a string and deserialized |
| back into a <code>MessageId</code> object:</p> |
| <p># Serialize to string |
| s = msg.message_id().serialize()</p> |
| <p># Deserialize from string |
| msg_id = MessageId.deserialize(s)</p> |
| </li> |
| </ul> |
| </li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>schema</code>: |
| Define the schema of the data that will be received by this reader.</li> |
| <li> |
| <p><code>reader_listener</code>: |
| Sets a message listener for the reader. When the listener is set, |
| the application will receive messages through it. Calls to |
| <code>reader.read_next()</code> will not be allowed. The listener function needs |
| to accept (reader, message), for example:</p> |
| <pre><code>def my_listener(reader, message): |
| # process message |
| pass |
| </code></pre> |
| </li> |
| <li> |
| <p><code>receiver_queue_size</code>: |
| Sets the size of the reader receive queue. The reader receive |
| queue controls how many messages can be accumulated by the reader |
| before the application calls <code>read_next()</code>. Using a higher value could |
| potentially increase the reader throughput at the expense of higher |
| memory utilization.</p> |
| </li> |
| <li><code>reader_name</code>: |
| Sets the reader name.</li> |
| <li><code>subscription_role_prefix</code>: |
| Sets the subscription role prefix.</li> |
| <li><code>is_read_compacted</code>: |
| Selects whether to read the compacted version of the topic</li> |
| <li>crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_reader', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.create_reader" class="source"> |
| <pre><code>def create_reader(self, topic, start_message_id, |
| schema=schema.BytesSchema(), |
| reader_listener=None, |
| receiver_queue_size=1000, |
| reader_name=None, |
| subscription_role_prefix=None, |
| is_read_compacted=False, |
| crypto_key_reader=None |
| ): |
| """ |
| Create a reader on a particular topic |
| **Args** |
| * `topic`: The name of the topic. |
| * `start_message_id`: The initial reader positioning is done by specifying a message id. |
| The options are: |
| * `MessageId.earliest`: Start reading from the earliest message available in the topic |
| * `MessageId.latest`: Start reading from the end topic, only getting messages published |
| after the reader was created |
| * `MessageId`: When passing a particular message id, the reader will position itself on |
| that specific position. The first message to be read will be the message next to the |
| specified messageId. Message id can be serialized into a string and deserialized |
| back into a `MessageId` object: |
| # Serialize to string |
| s = msg.message_id().serialize() |
| # Deserialize from string |
| msg_id = MessageId.deserialize(s) |
| **Options** |
| * `schema`: |
| Define the schema of the data that will be received by this reader. |
| * `reader_listener`: |
| Sets a message listener for the reader. When the listener is set, |
| the application will receive messages through it. Calls to |
| `reader.read_next()` will not be allowed. The listener function needs |
| to accept (reader, message), for example: |
| def my_listener(reader, message): |
| # process message |
| pass |
| * `receiver_queue_size`: |
| Sets the size of the reader receive queue. The reader receive |
| queue controls how many messages can be accumulated by the reader |
| before the application calls `read_next()`. Using a higher value could |
| potentially increase the reader throughput at the expense of higher |
| memory utilization. |
| * `reader_name`: |
| Sets the reader name. |
| * `subscription_role_prefix`: |
| Sets the subscription role prefix. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| """ |
| _check_type(str, topic, 'topic') |
| _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type_or_none(str, reader_name, 'reader_name') |
| _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| conf = _pulsar.ReaderConfiguration() |
| if reader_listener: |
| conf.reader_listener(_listener_wrapper(reader_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| if reader_name: |
| conf.reader_name(reader_name) |
| if subscription_role_prefix: |
| conf.subscription_role_prefix(subscription_role_prefix) |
| conf.schema(schema.schema_info()) |
| conf.read_compacted(is_read_compacted) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| c = Reader() |
| c._reader = self._client.create_reader(topic, start_message_id, conf) |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.get_topic_partitions"> |
| <p>def <span class="ident">get_topic_partitions</span>(</p><p>self, topic)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the list of partitions for a given topic.</p> |
| <p>If the topic is partitioned, this will return a list of partition names. If the topic is not |
| partitioned, the returned list will contain the topic name itself.</p> |
| <p>This can be used to discover the partitions and create Reader, Consumer or Producer |
| instances directly on a particular partition. |
| :param topic: the topic name to lookup |
| :return: a list of partition name</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.get_topic_partitions', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.get_topic_partitions" class="source"> |
| <pre><code>def get_topic_partitions(self, topic): |
| """ |
| Get the list of partitions for a given topic. |
| If the topic is partitioned, this will return a list of partition names. If the topic is not |
| partitioned, the returned list will contain the topic name itself. |
| This can be used to discover the partitions and create Reader, Consumer or Producer |
| instances directly on a particular partition. |
| :param topic: the topic name to lookup |
| :return: a list of partition name |
| """ |
| _check_type(str, topic, 'topic') |
| return self._client.get_topic_partitions(topic) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.shutdown"> |
| <p>def <span class="ident">shutdown</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Perform immediate shutdown of Pulsar client.</p> |
| <p>Release all resources and close all producer, consumer, and readers without waiting |
| for ongoing operations to complete.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.shutdown', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.shutdown" class="source"> |
| <pre><code>def shutdown(self): |
| """ |
| Perform immediate shutdown of Pulsar client. |
| Release all resources and close all producer, consumer, and readers without waiting |
| for ongoing operations to complete. |
| """ |
| self._client.shutdown() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Client.subscribe"> |
| <p>def <span class="ident">subscribe</span>(</p><p>self, topic, subscription_name, consumer_type=_pulsar.ConsumerType.Exclusive, schema=<pulsar.schema.schema.BytesSchema object at 0x7fbbce1c1290>, message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, initial_position=_pulsar.InitialPosition.Latest, crypto_key_reader=None, replicate_subscription_state_enabled=False)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Subscribe to the given topic and subscription combination.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>topic</code>: The name of the topic, list of topics or regex pattern. |
| This method will accept these forms: |
| - <code>topic='my-topic'</code> |
| - <code>topic=['topic-1', 'topic-2', 'topic-3']</code> |
| - <code>topic=re.compile('persistent://public/default/topic-*')</code></li> |
| <li><code>subscription</code>: The name of the subscription.</li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>consumer_type</code>: |
| Select the subscription type to be used when subscribing to the topic.</li> |
| <li><code>schema</code>: |
| Define the schema of the data that will be received by this consumer.</li> |
| <li> |
| <p><code>message_listener</code>: |
| Sets a message listener for the consumer. When the listener is set, |
| the application will receive messages through it. Calls to |
| <code>consumer.receive()</code> will not be allowed. The listener function needs |
| to accept (consumer, message), for example:</p> |
| <pre><code>#!python |
| def my_listener(consumer, message): |
| # process message |
| consumer.acknowledge(message) |
| </code></pre> |
| </li> |
| <li> |
| <p><code>receiver_queue_size</code>: |
| Sets the size of the consumer receive queue. The consumer receive |
| queue controls how many messages can be accumulated by the consumer |
| before the application calls <code>receive()</code>. Using a higher value could |
| potentially increase the consumer throughput at the expense of higher |
| memory utilization. Setting the consumer queue size to zero decreases |
| the throughput of the consumer by disabling pre-fetching of messages. |
| This approach improves the message distribution on shared subscription |
| by pushing messages only to those consumers that are ready to process |
| them. Neither receive with timeout nor partitioned topics can be used |
| if the consumer queue size is zero. The <code>receive()</code> function call |
| should not be interrupted when the consumer queue size is zero. The |
| default value is 1000 messages and should work well for most use |
| cases.</p> |
| </li> |
| <li><code>max_total_receiver_queue_size_across_partitions</code> |
| Set the max total receiver queue size across partitions. |
| This setting will be used to reduce the receiver queue size for individual partitions</li> |
| <li><code>consumer_name</code>: |
| Sets the consumer name.</li> |
| <li><code>unacked_messages_timeout_ms</code>: |
| Sets the timeout in milliseconds for unacknowledged messages. The |
| timeout needs to be greater than 10 seconds. An exception is thrown if |
| the given value is less than 10 seconds. If a successful |
| acknowledgement is not sent within the timeout, all the unacknowledged |
| messages are redelivered.</li> |
| <li><code>negative_ack_redelivery_delay_ms</code>: |
| The delay after which to redeliver the messages that failed to be |
| processed (with the <code>consumer.negative_acknowledge()</code>)</li> |
| <li><code>broker_consumer_stats_cache_time_ms</code>: |
| Sets the time duration for which the broker-side consumer stats will |
| be cached in the client.</li> |
| <li><code>is_read_compacted</code>: |
| Selects whether to read the compacted version of the topic</li> |
| <li><code>properties</code>: |
| Sets the properties for the consumer. The properties associated with a consumer |
| can be used for identify a consumer at broker side.</li> |
| <li><code>pattern_auto_discovery_period</code>: |
| Periods of seconds for consumer to auto discover match topics.</li> |
| <li><code>initial_position</code>: |
| Set the initial position of a consumer when subscribing to the topic. |
| It could be either: <code>InitialPosition.Earliest</code> or <code>InitialPosition.Latest</code>. |
| Default: <code>Latest</code>.</li> |
| <li>crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer</li> |
| <li>replicate_subscription_state_enabled: |
| Set whether the subscription status should be replicated. |
| Default: <code>False</code>.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.subscribe', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Client.subscribe" class="source"> |
| <pre><code>def subscribe(self, topic, subscription_name, |
| consumer_type=ConsumerType.Exclusive, |
| schema=schema.BytesSchema(), |
| message_listener=None, |
| receiver_queue_size=1000, |
| max_total_receiver_queue_size_across_partitions=50000, |
| consumer_name=None, |
| unacked_messages_timeout_ms=None, |
| broker_consumer_stats_cache_time_ms=30000, |
| negative_ack_redelivery_delay_ms=60000, |
| is_read_compacted=False, |
| properties=None, |
| pattern_auto_discovery_period=60, |
| initial_position=InitialPosition.Latest, |
| crypto_key_reader=None, |
| replicate_subscription_state_enabled=False |
| ): |
| """ |
| Subscribe to the given topic and subscription combination. |
| **Args** |
| * `topic`: The name of the topic, list of topics or regex pattern. |
| This method will accept these forms: |
| - `topic='my-topic'` |
| - `topic=['topic-1', 'topic-2', 'topic-3']` |
| - `topic=re.compile('persistent://public/default/topic-*')` |
| * `subscription`: The name of the subscription. |
| **Options** |
| * `consumer_type`: |
| Select the subscription type to be used when subscribing to the topic. |
| * `schema`: |
| Define the schema of the data that will be received by this consumer. |
| * `message_listener`: |
| Sets a message listener for the consumer. When the listener is set, |
| the application will receive messages through it. Calls to |
| `consumer.receive()` will not be allowed. The listener function needs |
| to accept (consumer, message), for example: |
| #!python |
| def my_listener(consumer, message): |
| # process message |
| consumer.acknowledge(message) |
| * `receiver_queue_size`: |
| Sets the size of the consumer receive queue. The consumer receive |
| queue controls how many messages can be accumulated by the consumer |
| before the application calls `receive()`. Using a higher value could |
| potentially increase the consumer throughput at the expense of higher |
| memory utilization. Setting the consumer queue size to zero decreases |
| the throughput of the consumer by disabling pre-fetching of messages. |
| This approach improves the message distribution on shared subscription |
| by pushing messages only to those consumers that are ready to process |
| them. Neither receive with timeout nor partitioned topics can be used |
| if the consumer queue size is zero. The `receive()` function call |
| should not be interrupted when the consumer queue size is zero. The |
| default value is 1000 messages and should work well for most use |
| cases. |
| * `max_total_receiver_queue_size_across_partitions` |
| Set the max total receiver queue size across partitions. |
| This setting will be used to reduce the receiver queue size for individual partitions |
| * `consumer_name`: |
| Sets the consumer name. |
| * `unacked_messages_timeout_ms`: |
| Sets the timeout in milliseconds for unacknowledged messages. The |
| timeout needs to be greater than 10 seconds. An exception is thrown if |
| the given value is less than 10 seconds. If a successful |
| acknowledgement is not sent within the timeout, all the unacknowledged |
| messages are redelivered. |
| * `negative_ack_redelivery_delay_ms`: |
| The delay after which to redeliver the messages that failed to be |
| processed (with the `consumer.negative_acknowledge()`) |
| * `broker_consumer_stats_cache_time_ms`: |
| Sets the time duration for which the broker-side consumer stats will |
| be cached in the client. |
| * `is_read_compacted`: |
| Selects whether to read the compacted version of the topic |
| * `properties`: |
| Sets the properties for the consumer. The properties associated with a consumer |
| can be used for identify a consumer at broker side. |
| * `pattern_auto_discovery_period`: |
| Periods of seconds for consumer to auto discover match topics. |
| * `initial_position`: |
| Set the initial position of a consumer when subscribing to the topic. |
| It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`. |
| Default: `Latest`. |
| * crypto_key_reader: |
| Symmetric encryption class implementation, configuring public key encryption messages for the producer |
| and private key decryption messages for the consumer |
| * replicate_subscription_state_enabled: |
| Set whether the subscription status should be replicated. |
| Default: `False`. |
| """ |
| _check_type(str, subscription_name, 'subscription_name') |
| _check_type(ConsumerType, consumer_type, 'consumer_type') |
| _check_type(_schema.Schema, schema, 'schema') |
| _check_type(int, receiver_queue_size, 'receiver_queue_size') |
| _check_type(int, max_total_receiver_queue_size_across_partitions, |
| 'max_total_receiver_queue_size_across_partitions') |
| _check_type_or_none(str, consumer_name, 'consumer_name') |
| _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') |
| _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') |
| _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') |
| _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') |
| _check_type(bool, is_read_compacted, 'is_read_compacted') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type(InitialPosition, initial_position, 'initial_position') |
| _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') |
| conf = _pulsar.ConsumerConfiguration() |
| conf.consumer_type(consumer_type) |
| conf.read_compacted(is_read_compacted) |
| if message_listener: |
| conf.message_listener(_listener_wrapper(message_listener, schema)) |
| conf.receiver_queue_size(receiver_queue_size) |
| conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) |
| if consumer_name: |
| conf.consumer_name(consumer_name) |
| if unacked_messages_timeout_ms: |
| conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) |
| conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms) |
| conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) |
| if properties: |
| for k, v in properties.items(): |
| conf.property(k, v) |
| conf.subscription_initial_position(initial_position) |
| conf.schema(schema.schema_info()) |
| if crypto_key_reader: |
| conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) |
| conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) |
| c = Consumer() |
| if isinstance(topic, str): |
| # Single topic |
| c._consumer = self._client.subscribe(topic, subscription_name, conf) |
| elif isinstance(topic, list): |
| # List of topics |
| c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) |
| elif isinstance(topic, _retype): |
| # Regex pattern |
| c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) |
| else: |
| raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") |
| c._client = self |
| c._schema = schema |
| self._consumers.append(c) |
| return c |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.Consumer" class="name">class <span class="ident">Consumer</span></p> |
| |
| |
| <div class="desc"><p>Pulsar consumer.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer" class="source"> |
| <pre><code>class Consumer: |
| """ |
| Pulsar consumer. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic this consumer is subscribed to. |
| """ |
| return self._consumer.topic() |
| |
| def subscription_name(self): |
| """ |
| Return the subscription name. |
| """ |
| return self._consumer.subscription_name() |
| |
| def unsubscribe(self): |
| """ |
| Unsubscribe the current consumer from the topic. |
| |
| This method will block until the operation is completed. Once the |
| consumer is unsubscribed, no more messages will be received and |
| subsequent new messages will not be retained for this consumer. |
| |
| This consumer object cannot be reused. |
| """ |
| return self._consumer.unsubscribe() |
| |
| def receive(self, timeout_millis=None): |
| """ |
| Receive a single message. |
| |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| |
| **Options** |
| |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._consumer.receive() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._consumer.receive(timeout_millis) |
| |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| |
| def acknowledge(self, message): |
| """ |
| Acknowledge the reception of a single message. |
| |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the message will not be re-delivered to this consumer. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge(message._message) |
| else: |
| self._consumer.acknowledge(message) |
| |
| def acknowledge_cumulative(self, message): |
| """ |
| Acknowledge the reception of all the messages in the stream up to (and |
| including) the provided message. |
| |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the messages will not be re-delivered to this consumer. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge_cumulative(message._message) |
| else: |
| self._consumer.acknowledge_cumulative(message) |
| |
| def negative_acknowledge(self, message): |
| """ |
| Acknowledge the failure to process a single message. |
| |
| When a message is "negatively acked" it will be marked for redelivery after |
| some fixed delay. The delay is configurable when constructing the consumer |
| with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. |
| |
| This call is not blocking. |
| |
| **Args** |
| |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.negative_acknowledge(message._message) |
| else: |
| self._consumer.negative_acknowledge(message) |
| |
| def pause_message_listener(self): |
| """ |
| Pause receiving messages via the `message_listener` until |
| `resume_message_listener()` is called. |
| """ |
| self._consumer.pause_message_listener() |
| |
| def resume_message_listener(self): |
| """ |
| Resume receiving the messages via the message listener. |
| Asynchronously receive all the messages enqueued from the time |
| `pause_message_listener()` was called. |
| """ |
| self._consumer.resume_message_listener() |
| |
| def redeliver_unacknowledged_messages(self): |
| """ |
| Redelivers all the unacknowledged messages. In failover mode, the |
| request is ignored if the consumer is not active for the given topic. In |
| shared mode, the consumer's messages to be redelivered are distributed |
| across all the connected consumers. This is a non-blocking call and |
| doesn't throw an exception. In case the connection breaks, the messages |
| are redelivered after reconnect. |
| """ |
| self._consumer.redeliver_unacknowledged_messages() |
| |
| def seek(self, messageid): |
| """ |
| Reset the subscription associated with this consumer to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| |
| **Args** |
| |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._consumer.seek(messageid) |
| |
| def close(self): |
| """ |
| Close the consumer. |
| """ |
| self._consumer.close() |
| self._client._consumers.remove(self) |
| |
| def is_connected(self): |
| """ |
| Check if the consumer is connected or not. |
| """ |
| return self._consumer.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Consumer">Consumer</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.acknowledge"> |
| <p>def <span class="ident">acknowledge</span>(</p><p>self, message)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Acknowledge the reception of a single message.</p> |
| <p>This method will block until an acknowledgement is sent to the broker. |
| After that, the message will not be re-delivered to this consumer.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>message</code>: |
| The received message or message id.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.acknowledge" class="source"> |
| <pre><code>def acknowledge(self, message): |
| """ |
| Acknowledge the reception of a single message. |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the message will not be re-delivered to this consumer. |
| **Args** |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge(message._message) |
| else: |
| self._consumer.acknowledge(message) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.acknowledge_cumulative"> |
| <p>def <span class="ident">acknowledge_cumulative</span>(</p><p>self, message)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Acknowledge the reception of all the messages in the stream up to (and |
| including) the provided message.</p> |
| <p>This method will block until an acknowledgement is sent to the broker. |
| After that, the messages will not be re-delivered to this consumer.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>message</code>: |
| The received message or message id.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge_cumulative', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.acknowledge_cumulative" class="source"> |
| <pre><code>def acknowledge_cumulative(self, message): |
| """ |
| Acknowledge the reception of all the messages in the stream up to (and |
| including) the provided message. |
| This method will block until an acknowledgement is sent to the broker. |
| After that, the messages will not be re-delivered to this consumer. |
| **Args** |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.acknowledge_cumulative(message._message) |
| else: |
| self._consumer.acknowledge_cumulative(message) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.close"> |
| <p>def <span class="ident">close</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Close the consumer.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.close', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.close" class="source"> |
| <pre><code>def close(self): |
| """ |
| Close the consumer. |
| """ |
| self._consumer.close() |
| self._client._consumers.remove(self) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.is_connected"> |
| <p>def <span class="ident">is_connected</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Check if the consumer is connected or not.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.is_connected', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.is_connected" class="source"> |
| <pre><code>def is_connected(self): |
| """ |
| Check if the consumer is connected or not. |
| """ |
| return self._consumer.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.negative_acknowledge"> |
| <p>def <span class="ident">negative_acknowledge</span>(</p><p>self, message)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Acknowledge the failure to process a single message.</p> |
| <p>When a message is "negatively acked" it will be marked for redelivery after |
| some fixed delay. The delay is configurable when constructing the consumer |
| with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.</p> |
| <p>This call is not blocking.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>message</code>: |
| The received message or message id.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.negative_acknowledge', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.negative_acknowledge" class="source"> |
| <pre><code>def negative_acknowledge(self, message): |
| """ |
| Acknowledge the failure to process a single message. |
| When a message is "negatively acked" it will be marked for redelivery after |
| some fixed delay. The delay is configurable when constructing the consumer |
| with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. |
| This call is not blocking. |
| **Args** |
| * `message`: |
| The received message or message id. |
| """ |
| if isinstance(message, Message): |
| self._consumer.negative_acknowledge(message._message) |
| else: |
| self._consumer.negative_acknowledge(message) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.pause_message_listener"> |
| <p>def <span class="ident">pause_message_listener</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Pause receiving messages via the <code>message_listener</code> until |
| <code>resume_message_listener()</code> is called.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.pause_message_listener', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.pause_message_listener" class="source"> |
| <pre><code>def pause_message_listener(self): |
| """ |
| Pause receiving messages via the `message_listener` until |
| `resume_message_listener()` is called. |
| """ |
| self._consumer.pause_message_listener() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.receive"> |
| <p>def <span class="ident">receive</span>(</p><p>self, timeout_millis=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Receive a single message.</p> |
| <p>If a message is not immediately available, this method will block until |
| a new message is available.</p> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>timeout_millis</code>: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.receive', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.receive" class="source"> |
| <pre><code>def receive(self, timeout_millis=None): |
| """ |
| Receive a single message. |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| **Options** |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._consumer.receive() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._consumer.receive(timeout_millis) |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.redeliver_unacknowledged_messages"> |
| <p>def <span class="ident">redeliver_unacknowledged_messages</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Redelivers all the unacknowledged messages. In failover mode, the |
| request is ignored if the consumer is not active for the given topic. In |
| shared mode, the consumer's messages to be redelivered are distributed |
| across all the connected consumers. This is a non-blocking call and |
| doesn't throw an exception. In case the connection breaks, the messages |
| are redelivered after reconnect.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.redeliver_unacknowledged_messages', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.redeliver_unacknowledged_messages" class="source"> |
| <pre><code>def redeliver_unacknowledged_messages(self): |
| """ |
| Redelivers all the unacknowledged messages. In failover mode, the |
| request is ignored if the consumer is not active for the given topic. In |
| shared mode, the consumer's messages to be redelivered are distributed |
| across all the connected consumers. This is a non-blocking call and |
| doesn't throw an exception. In case the connection breaks, the messages |
| are redelivered after reconnect. |
| """ |
| self._consumer.redeliver_unacknowledged_messages() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.resume_message_listener"> |
| <p>def <span class="ident">resume_message_listener</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Resume receiving the messages via the message listener. |
| Asynchronously receive all the messages enqueued from the time |
| <code>pause_message_listener()</code> was called.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.resume_message_listener', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.resume_message_listener" class="source"> |
| <pre><code>def resume_message_listener(self): |
| """ |
| Resume receiving the messages via the message listener. |
| Asynchronously receive all the messages enqueued from the time |
| `pause_message_listener()` was called. |
| """ |
| self._consumer.resume_message_listener() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.seek"> |
| <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Reset the subscription associated with this consumer to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>message</code>: |
| The message id for seek, OR an integer event time to seek to</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.seek', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.seek" class="source"> |
| <pre><code>def seek(self, messageid): |
| """ |
| Reset the subscription associated with this consumer to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| **Args** |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._consumer.seek(messageid) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.subscription_name"> |
| <p>def <span class="ident">subscription_name</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the subscription name.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.subscription_name', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.subscription_name" class="source"> |
| <pre><code>def subscription_name(self): |
| """ |
| Return the subscription name. |
| """ |
| return self._consumer.subscription_name() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.topic"> |
| <p>def <span class="ident">topic</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the topic this consumer is subscribed to.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.topic', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.topic" class="source"> |
| <pre><code>def topic(self): |
| """ |
| Return the topic this consumer is subscribed to. |
| """ |
| return self._consumer.topic() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Consumer.unsubscribe"> |
| <p>def <span class="ident">unsubscribe</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Unsubscribe the current consumer from the topic.</p> |
| <p>This method will block until the operation is completed. Once the |
| consumer is unsubscribed, no more messages will be received and |
| subsequent new messages will not be retained for this consumer.</p> |
| <p>This consumer object cannot be reused.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.unsubscribe', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Consumer.unsubscribe" class="source"> |
| <pre><code>def unsubscribe(self): |
| """ |
| Unsubscribe the current consumer from the topic. |
| This method will block until the operation is completed. Once the |
| consumer is unsubscribed, no more messages will be received and |
| subsequent new messages will not be retained for this consumer. |
| This consumer object cannot be reused. |
| """ |
| return self._consumer.unsubscribe() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.CryptoKeyReader" class="name">class <span class="ident">CryptoKeyReader</span></p> |
| |
| |
| <div class="desc"><p>Default crypto key reader implementation</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader', this);">Show source ≡</a></p> |
| <div id="source-pulsar.CryptoKeyReader" class="source"> |
| <pre><code>class CryptoKeyReader: |
| """ |
| Default crypto key reader implementation |
| """ |
| def __init__(self, public_key_path, private_key_path): |
| """ |
| Create crypto key reader. |
| |
| **Args** |
| |
| * `public_key_path`: Path to the public key |
| * `private_key_path`: Path to private key |
| """ |
| _check_type(str, public_key_path, 'public_key_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></li> |
| </ul> |
| <h3>Instance variables</h3> |
| <div class="item"> |
| <p id="pulsar.CryptoKeyReader.cryptoKeyReader" class="name">var <span class="ident">cryptoKeyReader</span></p> |
| |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.CryptoKeyReader.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, public_key_path, private_key_path)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Create crypto key reader.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>public_key_path</code>: Path to the public key</li> |
| <li><code>private_key_path</code>: Path to private key</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.CryptoKeyReader.__init__" class="source"> |
| <pre><code>def __init__(self, public_key_path, private_key_path): |
| """ |
| Create crypto key reader. |
| **Args** |
| * `public_key_path`: Path to the public key |
| * `private_key_path`: Path to private key |
| """ |
| _check_type(str, public_key_path, 'public_key_path') |
| _check_type(str, private_key_path, 'private_key_path') |
| self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.Message" class="name">class <span class="ident">Message</span></p> |
| |
| |
| <div class="desc"><p>Message objects are returned by a consumer, either by calling <code>receive</code> or |
| through a listener.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message" class="source"> |
| <pre><code>class Message: |
| """ |
| Message objects are returned by a consumer, either by calling `receive` or |
| through a listener. |
| """ |
| |
| def data(self): |
| """ |
| Returns object typed bytes with the payload of the message. |
| """ |
| return self._message.data() |
| |
| def value(self): |
| """ |
| Returns object with the de-serialized version of the message content |
| """ |
| return self._schema.decode(self._message.data()) |
| |
| def properties(self): |
| """ |
| Return the properties attached to the message. Properties are |
| application-defined key/value pairs that will be attached to the |
| message. |
| """ |
| return self._message.properties() |
| |
| def partition_key(self): |
| """ |
| Get the partitioning key for the message. |
| """ |
| return self._message.partition_key() |
| |
| def publish_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message publish time. |
| """ |
| return self._message.publish_timestamp() |
| |
| def event_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message event time. |
| """ |
| return self._message.event_timestamp() |
| |
| def message_id(self): |
| """ |
| The message ID that can be used to refere to this particular message. |
| """ |
| return self._message.message_id() |
| |
| def topic_name(self): |
| """ |
| Get the topic Name from which this message originated from |
| """ |
| return self._message.topic_name() |
| |
| def redelivery_count(self): |
| """ |
| Get the redelivery count for this message |
| """ |
| return self._message.redelivery_count() |
| |
| def schema_version(self): |
| """ |
| Get the schema version for this message |
| """ |
| return self._message.schema_version() |
| |
| @staticmethod |
| def _wrap(_message): |
| self = Message() |
| self._message = _message |
| return self |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Message">Message</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.data"> |
| <p>def <span class="ident">data</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Returns object typed bytes with the payload of the message.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.data', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.data" class="source"> |
| <pre><code>def data(self): |
| """ |
| Returns object typed bytes with the payload of the message. |
| """ |
| return self._message.data() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.event_timestamp"> |
| <p>def <span class="ident">event_timestamp</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the timestamp in milliseconds with the message event time.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.event_timestamp', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.event_timestamp" class="source"> |
| <pre><code>def event_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message event time. |
| """ |
| return self._message.event_timestamp() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.message_id"> |
| <p>def <span class="ident">message_id</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>The message ID that can be used to refere to this particular message.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.message_id', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.message_id" class="source"> |
| <pre><code>def message_id(self): |
| """ |
| The message ID that can be used to refere to this particular message. |
| """ |
| return self._message.message_id() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.partition_key"> |
| <p>def <span class="ident">partition_key</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the partitioning key for the message.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.partition_key', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.partition_key" class="source"> |
| <pre><code>def partition_key(self): |
| """ |
| Get the partitioning key for the message. |
| """ |
| return self._message.partition_key() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.properties"> |
| <p>def <span class="ident">properties</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the properties attached to the message. Properties are |
| application-defined key/value pairs that will be attached to the |
| message.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.properties', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.properties" class="source"> |
| <pre><code>def properties(self): |
| """ |
| Return the properties attached to the message. Properties are |
| application-defined key/value pairs that will be attached to the |
| message. |
| """ |
| return self._message.properties() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.publish_timestamp"> |
| <p>def <span class="ident">publish_timestamp</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the timestamp in milliseconds with the message publish time.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.publish_timestamp', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.publish_timestamp" class="source"> |
| <pre><code>def publish_timestamp(self): |
| """ |
| Get the timestamp in milliseconds with the message publish time. |
| """ |
| return self._message.publish_timestamp() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.redelivery_count"> |
| <p>def <span class="ident">redelivery_count</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the redelivery count for this message</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.redelivery_count', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.redelivery_count" class="source"> |
| <pre><code>def redelivery_count(self): |
| """ |
| Get the redelivery count for this message |
| """ |
| return self._message.redelivery_count() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.schema_version"> |
| <p>def <span class="ident">schema_version</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the schema version for this message</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.schema_version', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.schema_version" class="source"> |
| <pre><code>def schema_version(self): |
| """ |
| Get the schema version for this message |
| """ |
| return self._message.schema_version() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.topic_name"> |
| <p>def <span class="ident">topic_name</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the topic Name from which this message originated from</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.topic_name', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.topic_name" class="source"> |
| <pre><code>def topic_name(self): |
| """ |
| Get the topic Name from which this message originated from |
| """ |
| return self._message.topic_name() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Message.value"> |
| <p>def <span class="ident">value</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Returns object with the de-serialized version of the message content</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.value', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Message.value" class="source"> |
| <pre><code>def value(self): |
| """ |
| Returns object with the de-serialized version of the message content |
| """ |
| return self._schema.decode(self._message.data()) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.MessageBatch" class="name">class <span class="ident">MessageBatch</span></p> |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageBatch" class="source"> |
| <pre><code>class MessageBatch: |
| |
| def __init__(self): |
| self._msg_batch = _pulsar.MessageBatch() |
| |
| def with_message_id(self, msg_id): |
| if not isinstance(msg_id, _pulsar.MessageId): |
| if isinstance(msg_id, MessageId): |
| msg_id = msg_id._msg_id |
| else: |
| raise TypeError("unknown message id type") |
| self._msg_batch.with_message_id(msg_id) |
| return self |
| |
| def parse_from(self, data, size): |
| self._msg_batch.parse_from(data, size) |
| _msgs = self._msg_batch.messages() |
| return list(map(Message._wrap, _msgs)) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.MessageBatch">MessageBatch</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageBatch.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageBatch.__init__" class="source"> |
| <pre><code>def __init__(self): |
| self._msg_batch = _pulsar.MessageBatch() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageBatch.parse_from"> |
| <p>def <span class="ident">parse_from</span>(</p><p>self, data, size)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.parse_from', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageBatch.parse_from" class="source"> |
| <pre><code>def parse_from(self, data, size): |
| self._msg_batch.parse_from(data, size) |
| _msgs = self._msg_batch.messages() |
| return list(map(Message._wrap, _msgs)) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageBatch.with_message_id"> |
| <p>def <span class="ident">with_message_id</span>(</p><p>self, msg_id)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.with_message_id', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageBatch.with_message_id" class="source"> |
| <pre><code>def with_message_id(self, msg_id): |
| if not isinstance(msg_id, _pulsar.MessageId): |
| if isinstance(msg_id, MessageId): |
| msg_id = msg_id._msg_id |
| else: |
| raise TypeError("unknown message id type") |
| self._msg_batch.with_message_id(msg_id) |
| return self |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.MessageId" class="name">class <span class="ident">MessageId</span></p> |
| |
| |
| <div class="desc"><p>Represents a message id</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId" class="source"> |
| <pre><code>class MessageId: |
| """ |
| Represents a message id |
| """ |
| |
| def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): |
| self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) |
| |
| 'Represents the earliest message stored in a topic' |
| earliest = _pulsar.MessageId.earliest |
| |
| 'Represents the latest message published on a topic' |
| latest = _pulsar.MessageId.latest |
| |
| def ledger_id(self): |
| return self._msg_id.ledger_id() |
| |
| def entry_id(self): |
| return self._msg_id.entry_id() |
| |
| def batch_index(self): |
| return self._msg_id.batch_index() |
| |
| def partition(self): |
| return self._msg_id.partition() |
| |
| def serialize(self): |
| """ |
| Returns a bytes representation of the message id. |
| This bytes sequence can be stored and later deserialized. |
| """ |
| return self._msg_id.serialize() |
| |
| @staticmethod |
| def deserialize(message_id_bytes): |
| """ |
| Deserialize a message id object from a previously |
| serialized bytes sequence. |
| """ |
| return _pulsar.MessageId.deserialize(message_id_bytes) |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.MessageId">MessageId</a></li> |
| </ul> |
| <h3>Class variables</h3> |
| <div class="item"> |
| <p id="pulsar.MessageId.earliest" class="name">var <span class="ident">earliest</span></p> |
| |
| |
| |
| |
| <div class="desc"><p>Represents the latest message published on a topic</p></div> |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <div class="item"> |
| <p id="pulsar.MessageId.latest" class="name">var <span class="ident">latest</span></p> |
| |
| |
| |
| |
| <div class="source_cont"> |
| </div> |
| |
| </div> |
| <h3>Static methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.deserialize"> |
| <p>def <span class="ident">deserialize</span>(</p><p>message_id_bytes)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Deserialize a message id object from a previously |
| serialized bytes sequence.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.deserialize', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.deserialize" class="source"> |
| <pre><code>@staticmethod |
| def deserialize(message_id_bytes): |
| """ |
| Deserialize a message id object from a previously |
| serialized bytes sequence. |
| """ |
| return _pulsar.MessageId.deserialize(message_id_bytes) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.__init__"> |
| <p>def <span class="ident">__init__</span>(</p><p>self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.__init__', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.__init__" class="source"> |
| <pre><code>def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): |
| self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.batch_index"> |
| <p>def <span class="ident">batch_index</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.batch_index', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.batch_index" class="source"> |
| <pre><code>def batch_index(self): |
| return self._msg_id.batch_index() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.entry_id"> |
| <p>def <span class="ident">entry_id</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.entry_id', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.entry_id" class="source"> |
| <pre><code>def entry_id(self): |
| return self._msg_id.entry_id() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.ledger_id"> |
| <p>def <span class="ident">ledger_id</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.ledger_id', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.ledger_id" class="source"> |
| <pre><code>def ledger_id(self): |
| return self._msg_id.ledger_id() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.partition"> |
| <p>def <span class="ident">partition</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.partition', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.partition" class="source"> |
| <pre><code>def partition(self): |
| return self._msg_id.partition() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.MessageId.serialize"> |
| <p>def <span class="ident">serialize</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Returns a bytes representation of the message id. |
| This bytes sequence can be stored and later deserialized.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.serialize', this);">Show source ≡</a></p> |
| <div id="source-pulsar.MessageId.serialize" class="source"> |
| <pre><code>def serialize(self): |
| """ |
| Returns a bytes representation of the message id. |
| This bytes sequence can be stored and later deserialized. |
| """ |
| return self._msg_id.serialize() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.Producer" class="name">class <span class="ident">Producer</span></p> |
| |
| |
| <div class="desc"><p>The Pulsar message producer, used to publish messages on a topic.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer" class="source"> |
| <pre><code>class Producer: |
| """ |
| The Pulsar message producer, used to publish messages on a topic. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic which producer is publishing to |
| """ |
| return self._producer.topic() |
| |
| def producer_name(self): |
| """ |
| Return the producer name which could have been assigned by the |
| system or specified by the client |
| """ |
| return self._producer.producer_name() |
| |
| def last_sequence_id(self): |
| """ |
| Get the last sequence id that was published by this producer. |
| |
| This represent either the automatically assigned or custom sequence id |
| (set on the `MessageBuilder`) that was published and acknowledged by the broker. |
| |
| After recreating a producer with the same producer name, this will return the |
| last message that was published in the previous producer session, or -1 if |
| there no message was ever published. |
| """ |
| return self._producer.last_sequence_id() |
| |
| def send(self, content, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Publish a message on the topic. Blocks until the message is acknowledged |
| |
| Returns a `MessageId` object that represents where the message is persisted. |
| |
| **Args** |
| |
| * `content`: |
| A `bytes` object with the message payload. |
| |
| **Options** |
| |
| * `properties`: |
| A dict of application-defined string properties. |
| * `partition_key`: |
| Sets the partition key for message routing. A hash of this key is used |
| to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: |
| Override namespace replication clusters. Note that it is the caller's |
| responsibility to provide valid cluster names and that all clusters |
| have been previously configured as topics. Given an empty list, |
| the message will replicate according to the namespace configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| return MessageId.deserialize(self._producer.send(msg)) |
| |
| def send_async(self, content, callback, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Send a message asynchronously. |
| |
| The `callback` will be invoked once the message has been acknowledged |
| by the broker. |
| |
| Example: |
| |
| #!python |
| def callback(res, msg_id): |
| print('Message published: %s' % res) |
| |
| producer.send_async(msg, callback) |
| |
| When the producer queue is full, by default the message will be rejected |
| and the callback invoked with an error code. |
| |
| **Args** |
| |
| * `content`: |
| A `bytes` object with the message payload. |
| |
| **Options** |
| |
| * `properties`: |
| A dict of application0-defined string properties. |
| * `partition_key`: |
| Sets the partition key for the message routing. A hash of this key is |
| used to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: Override namespace replication clusters. Note |
| that it is the caller's responsibility to provide valid cluster names |
| and that all clusters have been previously configured as topics. |
| Given an empty list, the message will replicate per the namespace |
| configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| self._producer.send_async(msg, callback) |
| |
| |
| def flush(self): |
| """ |
| Flush all the messages buffered in the client and wait until all messages have been |
| successfully persisted |
| """ |
| self._producer.flush() |
| |
| |
| def close(self): |
| """ |
| Close the producer. |
| """ |
| self._producer.close() |
| |
| def _build_msg(self, content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after): |
| data = self._schema.encode(content) |
| |
| _check_type(bytes, data, 'data') |
| _check_type_or_none(dict, properties, 'properties') |
| _check_type_or_none(str, partition_key, 'partition_key') |
| _check_type_or_none(int, sequence_id, 'sequence_id') |
| _check_type_or_none(list, replication_clusters, 'replication_clusters') |
| _check_type(bool, disable_replication, 'disable_replication') |
| _check_type_or_none(int, event_timestamp, 'event_timestamp') |
| _check_type_or_none(int, deliver_at, 'deliver_at') |
| _check_type_or_none(timedelta, deliver_after, 'deliver_after') |
| |
| mb = _pulsar.MessageBuilder() |
| mb.content(data) |
| if properties: |
| for k, v in properties.items(): |
| mb.property(k, v) |
| if partition_key: |
| mb.partition_key(partition_key) |
| if sequence_id: |
| mb.sequence_id(sequence_id) |
| if replication_clusters: |
| mb.replication_clusters(replication_clusters) |
| if disable_replication: |
| mb.disable_replication(disable_replication) |
| if event_timestamp: |
| mb.event_timestamp(event_timestamp) |
| if deliver_at: |
| mb.deliver_at(deliver_at) |
| if deliver_after: |
| mb.deliver_after(deliver_after) |
| |
| return mb.build() |
| |
| def is_connected(self): |
| """ |
| Check if the producer is connected or not. |
| """ |
| return self._producer.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Producer">Producer</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.close"> |
| <p>def <span class="ident">close</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Close the producer.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.close', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.close" class="source"> |
| <pre><code>def close(self): |
| """ |
| Close the producer. |
| """ |
| self._producer.close() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.flush"> |
| <p>def <span class="ident">flush</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Flush all the messages buffered in the client and wait until all messages have been |
| successfully persisted</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.flush', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.flush" class="source"> |
| <pre><code>def flush(self): |
| """ |
| Flush all the messages buffered in the client and wait until all messages have been |
| successfully persisted |
| """ |
| self._producer.flush() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.is_connected"> |
| <p>def <span class="ident">is_connected</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Check if the producer is connected or not.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.is_connected', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.is_connected" class="source"> |
| <pre><code>def is_connected(self): |
| """ |
| Check if the producer is connected or not. |
| """ |
| return self._producer.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.last_sequence_id"> |
| <p>def <span class="ident">last_sequence_id</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Get the last sequence id that was published by this producer.</p> |
| <p>This represent either the automatically assigned or custom sequence id |
| (set on the <code>MessageBuilder</code>) that was published and acknowledged by the broker.</p> |
| <p>After recreating a producer with the same producer name, this will return the |
| last message that was published in the previous producer session, or -1 if |
| there no message was ever published.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.last_sequence_id', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.last_sequence_id" class="source"> |
| <pre><code>def last_sequence_id(self): |
| """ |
| Get the last sequence id that was published by this producer. |
| This represent either the automatically assigned or custom sequence id |
| (set on the `MessageBuilder`) that was published and acknowledged by the broker. |
| After recreating a producer with the same producer name, this will return the |
| last message that was published in the previous producer session, or -1 if |
| there no message was ever published. |
| """ |
| return self._producer.last_sequence_id() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.producer_name"> |
| <p>def <span class="ident">producer_name</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the producer name which could have been assigned by the |
| system or specified by the client</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.producer_name', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.producer_name" class="source"> |
| <pre><code>def producer_name(self): |
| """ |
| Return the producer name which could have been assigned by the |
| system or specified by the client |
| """ |
| return self._producer.producer_name() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.send"> |
| <p>def <span class="ident">send</span>(</p><p>self, content, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Publish a message on the topic. Blocks until the message is acknowledged</p> |
| <p>Returns a <code>MessageId</code> object that represents where the message is persisted.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>content</code>: |
| A <code>bytes</code> object with the message payload.</li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>properties</code>: |
| A dict of application-defined string properties.</li> |
| <li><code>partition_key</code>: |
| Sets the partition key for message routing. A hash of this key is used |
| to determine the message's topic partition.</li> |
| <li><code>sequence_id</code>: |
| Specify a custom sequence id for the message being published.</li> |
| <li><code>replication_clusters</code>: |
| Override namespace replication clusters. Note that it is the caller's |
| responsibility to provide valid cluster names and that all clusters |
| have been previously configured as topics. Given an empty list, |
| the message will replicate according to the namespace configuration.</li> |
| <li><code>disable_replication</code>: |
| Do not replicate this message.</li> |
| <li><code>event_timestamp</code>: |
| Timestamp in millis of the timestamp of event creation</li> |
| <li><code>deliver_at</code>: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC</li> |
| <li><code>deliver_after</code>: |
| Specify a delay in timedelta for the delivery of the messages.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.send" class="source"> |
| <pre><code>def send(self, content, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Publish a message on the topic. Blocks until the message is acknowledged |
| Returns a `MessageId` object that represents where the message is persisted. |
| **Args** |
| * `content`: |
| A `bytes` object with the message payload. |
| **Options** |
| * `properties`: |
| A dict of application-defined string properties. |
| * `partition_key`: |
| Sets the partition key for message routing. A hash of this key is used |
| to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: |
| Override namespace replication clusters. Note that it is the caller's |
| responsibility to provide valid cluster names and that all clusters |
| have been previously configured as topics. Given an empty list, |
| the message will replicate according to the namespace configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| return MessageId.deserialize(self._producer.send(msg)) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.send_async"> |
| <p>def <span class="ident">send_async</span>(</p><p>self, content, callback, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Send a message asynchronously.</p> |
| <p>The <code>callback</code> will be invoked once the message has been acknowledged |
| by the broker.</p> |
| <p>Example:</p> |
| <pre><code>#!python |
| def callback(res, msg_id): |
| print('Message published: %s' % res) |
| |
| producer.send_async(msg, callback) |
| </code></pre> |
| <p>When the producer queue is full, by default the message will be rejected |
| and the callback invoked with an error code.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>content</code>: |
| A <code>bytes</code> object with the message payload.</li> |
| </ul> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>properties</code>: |
| A dict of application0-defined string properties.</li> |
| <li><code>partition_key</code>: |
| Sets the partition key for the message routing. A hash of this key is |
| used to determine the message's topic partition.</li> |
| <li><code>sequence_id</code>: |
| Specify a custom sequence id for the message being published.</li> |
| <li><code>replication_clusters</code>: Override namespace replication clusters. Note |
| that it is the caller's responsibility to provide valid cluster names |
| and that all clusters have been previously configured as topics. |
| Given an empty list, the message will replicate per the namespace |
| configuration.</li> |
| <li><code>disable_replication</code>: |
| Do not replicate this message.</li> |
| <li><code>event_timestamp</code>: |
| Timestamp in millis of the timestamp of event creation</li> |
| <li><code>deliver_at</code>: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC</li> |
| <li><code>deliver_after</code>: |
| Specify a delay in timedelta for the delivery of the messages.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send_async', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.send_async" class="source"> |
| <pre><code>def send_async(self, content, callback, |
| properties=None, |
| partition_key=None, |
| sequence_id=None, |
| replication_clusters=None, |
| disable_replication=False, |
| event_timestamp=None, |
| deliver_at=None, |
| deliver_after=None, |
| ): |
| """ |
| Send a message asynchronously. |
| The `callback` will be invoked once the message has been acknowledged |
| by the broker. |
| Example: |
| #!python |
| def callback(res, msg_id): |
| print('Message published: %s' % res) |
| producer.send_async(msg, callback) |
| When the producer queue is full, by default the message will be rejected |
| and the callback invoked with an error code. |
| **Args** |
| * `content`: |
| A `bytes` object with the message payload. |
| **Options** |
| * `properties`: |
| A dict of application0-defined string properties. |
| * `partition_key`: |
| Sets the partition key for the message routing. A hash of this key is |
| used to determine the message's topic partition. |
| * `sequence_id`: |
| Specify a custom sequence id for the message being published. |
| * `replication_clusters`: Override namespace replication clusters. Note |
| that it is the caller's responsibility to provide valid cluster names |
| and that all clusters have been previously configured as topics. |
| Given an empty list, the message will replicate per the namespace |
| configuration. |
| * `disable_replication`: |
| Do not replicate this message. |
| * `event_timestamp`: |
| Timestamp in millis of the timestamp of event creation |
| * `deliver_at`: |
| Specify the this message should not be delivered earlier than the |
| specified timestamp. |
| The timestamp is milliseconds and based on UTC |
| * `deliver_after`: |
| Specify a delay in timedelta for the delivery of the messages. |
| """ |
| msg = self._build_msg(content, properties, partition_key, sequence_id, |
| replication_clusters, disable_replication, event_timestamp, |
| deliver_at, deliver_after) |
| self._producer.send_async(msg, callback) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Producer.topic"> |
| <p>def <span class="ident">topic</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the topic which producer is publishing to</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.topic', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Producer.topic" class="source"> |
| <pre><code>def topic(self): |
| """ |
| Return the topic which producer is publishing to |
| """ |
| return self._producer.topic() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <div class="item"> |
| <p id="pulsar.Reader" class="name">class <span class="ident">Reader</span></p> |
| |
| |
| <div class="desc"><p>Pulsar topic reader.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader" class="source"> |
| <pre><code>class Reader: |
| """ |
| Pulsar topic reader. |
| """ |
| |
| def topic(self): |
| """ |
| Return the topic this reader is reading from. |
| """ |
| return self._reader.topic() |
| |
| def read_next(self, timeout_millis=None): |
| """ |
| Read a single message. |
| |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| |
| **Options** |
| |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._reader.read_next() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._reader.read_next(timeout_millis) |
| |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| |
| def has_message_available(self): |
| """ |
| Check if there is any message available to read from the current position. |
| """ |
| return self._reader.has_message_available(); |
| |
| def seek(self, messageid): |
| """ |
| Reset this reader to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| |
| **Args** |
| |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._reader.seek(messageid) |
| |
| def close(self): |
| """ |
| Close the reader. |
| """ |
| self._reader.close() |
| self._client._consumers.remove(self) |
| |
| def is_connected(self): |
| """ |
| Check if the reader is connected or not. |
| """ |
| return self._reader.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| |
| <div class="class"> |
| <h3>Ancestors (in MRO)</h3> |
| <ul class="class_list"> |
| <li><a href="#pulsar.Reader">Reader</a></li> |
| </ul> |
| <h3>Methods</h3> |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.close"> |
| <p>def <span class="ident">close</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Close the reader.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.close', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.close" class="source"> |
| <pre><code>def close(self): |
| """ |
| Close the reader. |
| """ |
| self._reader.close() |
| self._client._consumers.remove(self) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.has_message_available"> |
| <p>def <span class="ident">has_message_available</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Check if there is any message available to read from the current position.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.has_message_available', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.has_message_available" class="source"> |
| <pre><code>def has_message_available(self): |
| """ |
| Check if there is any message available to read from the current position. |
| """ |
| return self._reader.has_message_available(); |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.is_connected"> |
| <p>def <span class="ident">is_connected</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Check if the reader is connected or not.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.is_connected', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.is_connected" class="source"> |
| <pre><code>def is_connected(self): |
| """ |
| Check if the reader is connected or not. |
| """ |
| return self._reader.is_connected() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.read_next"> |
| <p>def <span class="ident">read_next</span>(</p><p>self, timeout_millis=None)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Read a single message.</p> |
| <p>If a message is not immediately available, this method will block until |
| a new message is available.</p> |
| <p><strong>Options</strong></p> |
| <ul> |
| <li><code>timeout_millis</code>: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout.</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.read_next', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.read_next" class="source"> |
| <pre><code>def read_next(self, timeout_millis=None): |
| """ |
| Read a single message. |
| If a message is not immediately available, this method will block until |
| a new message is available. |
| **Options** |
| * `timeout_millis`: |
| If specified, the receive will raise an exception if a message is not |
| available within the timeout. |
| """ |
| if timeout_millis is None: |
| msg = self._reader.read_next() |
| else: |
| _check_type(int, timeout_millis, 'timeout_millis') |
| msg = self._reader.read_next(timeout_millis) |
| m = Message() |
| m._message = msg |
| m._schema = self._schema |
| return m |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.seek"> |
| <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Reset this reader to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions.</p> |
| <p><strong>Args</strong></p> |
| <ul> |
| <li><code>message</code>: |
| The message id for seek, OR an integer event time to seek to</li> |
| </ul></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.seek', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.seek" class="source"> |
| <pre><code>def seek(self, messageid): |
| """ |
| Reset this reader to a specific message id or publish timestamp. |
| The message id can either be a specific message or represent the first or last messages in the topic. |
| Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the |
| seek() on the individual partitions. |
| **Args** |
| * `message`: |
| The message id for seek, OR an integer event time to seek to |
| """ |
| self._reader.seek(messageid) |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| |
| <div class="item"> |
| <div class="name def" id="pulsar.Reader.topic"> |
| <p>def <span class="ident">topic</span>(</p><p>self)</p> |
| </div> |
| |
| |
| |
| |
| <div class="desc"><p>Return the topic this reader is reading from.</p></div> |
| <div class="source_cont"> |
| <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.topic', this);">Show source ≡</a></p> |
| <div id="source-pulsar.Reader.topic" class="source"> |
| <pre><code>def topic(self): |
| """ |
| Return the topic this reader is reading from. |
| """ |
| return self._reader.topic() |
| </code></pre> |
| </div> |
| </div> |
| |
| </div> |
| |
| </div> |
| </div> |
| |
| <h2 class="section-title" id="header-submodules">Sub-modules</h2> |
| <div class="item"> |
| <p class="name"><a href="exceptions.m.html">pulsar.exceptions</a></p> |
| |
| |
| |
| </div> |
| <div class="item"> |
| <p class="name"><a href="functions/index.html">pulsar.functions</a></p> |
| |
| |
| |
| </div> |
| <div class="item"> |
| <p class="name"><a href="schema/index.html">pulsar.schema</a></p> |
| |
| |
| |
| </div> |
| </section> |
| |
| </article> |
| <div class="clear"> </div> |
| <footer id="footer"> |
| <p> |
| Documentation generated by |
| <a href="https://github.com/BurntSushi/pdoc">pdoc 0.3.2</a> |
| </p> |
| |
| <p>pdoc is in the public domain with the |
| <a href="http://unlicense.org">UNLICENSE</a></p> |
| |
| <p>Design by <a href="http://nadh.in">Kailash Nadh</a></p> |
| </footer> |
| </div> |
| </body> |
| </html> |