blob: c652a1212ab2253eee518d0abc78b177f8f3095e [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>apache_beam.io.gcp.dicomio &mdash; Apache Beam 2.38.0 documentation</title>
<script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../../index.html" class="icon icon-home"> Apache Beam
</a>
<div class="version">
2.38.0
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li>
</ul>
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Apache Beam</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>apache_beam.io.gcp.dicomio</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for apache_beam.io.gcp.dicomio</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;DICOM IO connector</span>
<span class="sd">This module implements several tools to facilitate the interaction between</span>
<span class="sd">a Google Cloud Healthcare DICOM store and a Beam pipeline.</span>
<span class="sd">For more details on DICOM store and API:</span>
<span class="sd">https://cloud.google.com/healthcare/docs/how-tos/dicom</span>
<span class="sd">The DICOM IO connector can be used to search metadata or write DICOM files</span>
<span class="sd">to DICOM store.</span>
<span class="sd">When used together with Google Pubsub message connector, the</span>
<span class="sd">`FormatToQido` PTransform implemented in this module can be used</span>
<span class="sd">to convert Pubsub messages to search requests.</span>
<span class="sd">Since Traceability is crucial for healthcare</span>
<span class="sd">API users, every input or error message will be recorded in the output of</span>
<span class="sd">the DICOM IO connector. As a result, every PTransform in this module will</span>
<span class="sd">return a PCollection of dict that encodes results and detailed error messages.</span>
<span class="sd">Search instance&#39;s metadata (QIDO request)</span>
<span class="sd">===================================================</span>
<span class="sd">DicomSearch() wraps the QIDO request client and supports 3 levels of search.</span>
<span class="sd">Users should specify the level by setting the &#39;search_type&#39; entry in the input</span>
<span class="sd">dict. They can also refine the search by adding tags to filter the results using</span>
<span class="sd">the &#39;params&#39; entry. Here is a sample usage:</span>
<span class="sd"> with Pipeline() as p:</span>
<span class="sd"> input_dict = p | beam.Create(</span>
<span class="sd"> [{&#39;project_id&#39;: &#39;abc123&#39;, &#39;type&#39;: &#39;instances&#39;,...},</span>
<span class="sd"> {&#39;project_id&#39;: &#39;dicom_go&#39;, &#39;type&#39;: &#39;series&#39;,...}])</span>
<span class="sd"> results = input_dict | io.gcp.DicomSearch()</span>
<span class="sd"> results | &#39;print successful search&#39; &gt;&gt; beam.Map(</span>
<span class="sd"> lambda x: print(x[&#39;result&#39;] if x[&#39;success&#39;] else None))</span>
<span class="sd"> results | &#39;print failed search&#39; &gt;&gt; beam.Map(</span>
<span class="sd"> lambda x: print(x[&#39;result&#39;] if not x[&#39;success&#39;] else None))</span>
<span class="sd">In the example above, successful qido search results and error messages for</span>
<span class="sd">failed requests are printed. When used in real life, user can choose to filter</span>
<span class="sd">those data and output them to wherever they want.</span>
<span class="sd">Convert DICOM Pubsub message to Qido search request</span>
<span class="sd">===================================================</span>
<span class="sd">Healthcare API users might read messages from Pubsub to monitor the store</span>
<span class="sd">operations (e.g. new file) in a DICOM storage. Pubsub message encode</span>
<span class="sd">DICOM as a web store path as well as instance ids. If users are interested in</span>
<span class="sd">getting new instance&#39;s metadata, they can use the `FormatToQido` transform</span>
<span class="sd">to convert the message into Qido Search dict then use the `DicomSearch`</span>
<span class="sd">transform. Here is a sample usage:</span>
<span class="sd"> pipeline_options = PipelineOptions()</span>
<span class="sd"> pipeline_options.view_as(StandardOptions).streaming = True</span>
<span class="sd"> p = beam.Pipeline(options=pipeline_options)</span>
<span class="sd"> pubsub = p | beam.io.ReadStringFromPubsub(subscription=&#39;a_dicom_store&#39;)</span>
<span class="sd"> results = pubsub | FormatToQido()</span>
<span class="sd"> success = results | &#39;filter message&#39; &gt;&gt; beam.Filter(lambda x: x[&#39;success&#39;])</span>
<span class="sd"> qido_dict = success | &#39;get qido request&#39; &gt;&gt; beam.Map(lambda x: x[&#39;result&#39;])</span>
<span class="sd"> metadata = qido_dict | DicomSearch()</span>
<span class="sd">In the example above, the pipeline is listening to a pubsub topic and waiting</span>
<span class="sd">for messages from DICOM API. When a new DICOM file comes into the storage, the</span>
<span class="sd">pipeline will receive a pubsub message, convert it to a Qido request dict and</span>
<span class="sd">feed it to DicomSearch() PTransform. As a result, users can get the metadata for</span>
<span class="sd">every new DICOM file. Note that not every pubsub message received is from DICOM</span>
<span class="sd">API, so we to filter the results first.</span>
<span class="sd">Store a DICOM file in a DICOM storage</span>
<span class="sd">===================================================</span>
<span class="sd">UploadToDicomStore() wraps store request API and users can use it to send a</span>
<span class="sd">DICOM file to a DICOM store. It supports two types of input: 1.file data in</span>
<span class="sd">byte[] 2.fileio object. Users should set the &#39;input_type&#39; when initialzing</span>
<span class="sd">this PTransform. Here are the examples:</span>
<span class="sd"> with Pipeline() as p:</span>
<span class="sd"> input_dict = {&#39;project_id&#39;: &#39;abc123&#39;, &#39;type&#39;: &#39;instances&#39;,...}</span>
<span class="sd"> path = &quot;gcs://bucketname/something/a.dcm&quot;</span>
<span class="sd"> match = p | fileio.MatchFiles(path)</span>
<span class="sd"> fileio_obj = match | fileio.ReadAll()</span>
<span class="sd"> results = fileio_obj | UploadToDicomStore(input_dict, &#39;fileio&#39;)</span>
<span class="sd"> with Pipeline() as p:</span>
<span class="sd"> input_dict = {&#39;project_id&#39;: &#39;abc123&#39;, &#39;type&#39;: &#39;instances&#39;,...}</span>
<span class="sd"> f = open(&quot;abc.dcm&quot;, &quot;rb&quot;)</span>
<span class="sd"> dcm_file = f.read()</span>
<span class="sd"> byte_file = p | &#39;create byte file&#39; &gt;&gt; beam.Create([dcm_file])</span>
<span class="sd"> results = byte_file | UploadToDicomStore(input_dict, &#39;bytes&#39;)</span>
<span class="sd">The first example uses a PCollection of fileio objects as input.</span>
<span class="sd">UploadToDicomStore will read DICOM files from the objects and send them</span>
<span class="sd">to a DICOM storage.</span>
<span class="sd">The second example uses a PCollection of byte[] as input. UploadToDicomStore</span>
<span class="sd">will directly send those DICOM files to a DICOM storage.</span>
<span class="sd">Users can also get the operation results in the output PCollection if they want</span>
<span class="sd">to handle the failed store requests.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="c1"># pytype: skip-file</span>
<span class="kn">from</span> <span class="nn">concurrent.futures</span> <span class="kn">import</span> <span class="n">ThreadPoolExecutor</span>
<span class="kn">from</span> <span class="nn">concurrent.futures</span> <span class="kn">import</span> <span class="n">as_completed</span>
<span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span>
<span class="kn">from</span> <span class="nn">apache_beam.io.gcp.dicomclient</span> <span class="kn">import</span> <span class="n">DicomApiHttpClient</span>
<span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span>
<div class="viewcode-block" id="DicomSearch"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.DicomSearch">[docs]</a><span class="k">class</span> <span class="nc">DicomSearch</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A PTransform used for retrieving DICOM instance metadata from Google</span>
<span class="sd"> Cloud DICOM store. It takes a PCollection of dicts as input and return</span>
<span class="sd"> a PCollection of dict as results:</span>
<span class="sd"> INPUT:</span>
<span class="sd"> The input dict represents DICOM web path parameters, which has the following</span>
<span class="sd"> string keys and values:</span>
<span class="sd"> {</span>
<span class="sd"> &#39;project_id&#39;: str,</span>
<span class="sd"> &#39;region&#39;: str,</span>
<span class="sd"> &#39;dataset_id&#39;: str,</span>
<span class="sd"> &#39;dicom_store_id&#39;: str,</span>
<span class="sd"> &#39;search_type&#39;: str,</span>
<span class="sd"> &#39;params&#39;: dict(str,str) (Optional),</span>
<span class="sd"> }</span>
<span class="sd"> Key-value pairs:</span>
<span class="sd"> project_id: Id of the project in which the DICOM store is</span>
<span class="sd"> located. (Required)</span>
<span class="sd"> region: Region where the DICOM store resides. (Required)</span>
<span class="sd"> dataset_id: Id of the dataset where DICOM store belongs to. (Required)</span>
<span class="sd"> dicom_store_id: Id of the dicom store. (Required)</span>
<span class="sd"> search_type: Which type of search it is, could only be one of the three</span>
<span class="sd"> values: &#39;instances&#39;, &#39;series&#39;, or &#39;studies&#39;. (Required)</span>
<span class="sd"> params: A dict of str:str pairs used to refine QIDO search. (Optional)</span>
<span class="sd"> Supported tags in three categories:</span>
<span class="sd"> 1.Studies:</span>
<span class="sd"> * StudyInstanceUID,</span>
<span class="sd"> * PatientName,</span>
<span class="sd"> * PatientID,</span>
<span class="sd"> * AccessionNumber,</span>
<span class="sd"> * ReferringPhysicianName,</span>
<span class="sd"> * StudyDate,</span>
<span class="sd"> 2.Series: all study level search terms and</span>
<span class="sd"> * SeriesInstanceUID,</span>
<span class="sd"> * Modality,</span>
<span class="sd"> 3.Instances: all study/series level search terms and</span>
<span class="sd"> * SOPInstanceUID,</span>
<span class="sd"> e.g. {&quot;StudyInstanceUID&quot;:&quot;1&quot;,&quot;SeriesInstanceUID&quot;:&quot;2&quot;}</span>
<span class="sd"> OUTPUT:</span>
<span class="sd"> The output dict wraps results as well as error messages:</span>
<span class="sd"> {</span>
<span class="sd"> &#39;result&#39;: a list of dicts in JSON style.</span>
<span class="sd"> &#39;success&#39;: boolean value telling whether the operation is successful.</span>
<span class="sd"> &#39;input&#39;: detail ids and dicomweb path for this retrieval.</span>
<span class="sd"> &#39;status&#39;: status code from the server, used as error message.</span>
<span class="sd"> }</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">buffer_size</span><span class="o">=</span><span class="mi">8</span><span class="p">,</span> <span class="n">max_workers</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span> <span class="n">client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes DicomSearch.</span>
<span class="sd"> Args:</span>
<span class="sd"> buffer_size: # type: Int. Size of the request buffer.</span>
<span class="sd"> max_workers: # type: Int. Maximum number of threads a worker can</span>
<span class="sd"> create. If it is set to one, all the request will be processed</span>
<span class="sd"> sequentially in a worker.</span>
<span class="sd"> client: # type: object. If it is specified, all the Api calls will</span>
<span class="sd"> made by this client instead of the default one (DicomApiHttpClient).</span>
<span class="sd"> credential: # type: Google credential object, if it is specified, the</span>
<span class="sd"> Http client will use it to create sessions instead of the default.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> <span class="ow">or</span> <span class="n">DicomApiHttpClient</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span>
<div class="viewcode-block" id="DicomSearch.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.DicomSearch.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_QidoReadFn</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">credential</span><span class="p">))</span></div></div>
<span class="k">class</span> <span class="nc">_QidoReadFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn for executing every qido query request.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_size</span><span class="p">,</span> <span class="n">max_workers</span><span class="p">,</span> <span class="n">client</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">item</span>
<span class="k">def</span> <span class="nf">validate_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># Check if all required keys present.</span>
<span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;project_id&#39;</span><span class="p">,</span> <span class="s1">&#39;region&#39;</span><span class="p">,</span> <span class="s1">&#39;dataset_id&#39;</span><span class="p">,</span> <span class="s1">&#39;dicom_store_id&#39;</span><span class="p">,</span> <span class="s1">&#39;search_type&#39;</span>
<span class="p">]</span>
<span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">required_keys</span><span class="p">:</span>
<span class="k">if</span> <span class="n">key</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">element</span><span class="p">:</span>
<span class="n">error_message</span> <span class="o">=</span> <span class="s1">&#39;Must have </span><span class="si">%s</span><span class="s1"> in the dict.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">key</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_message</span>
<span class="c1"># Check if return type is correct.</span>
<span class="k">if</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;search_type&#39;</span><span class="p">]</span> <span class="ow">in</span> <span class="p">[</span><span class="s1">&#39;instances&#39;</span><span class="p">,</span> <span class="s2">&quot;studies&quot;</span><span class="p">,</span> <span class="s2">&quot;series&quot;</span><span class="p">]:</span>
<span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">error_message</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;Search type can only be &quot;studies&quot;, &#39;</span>
<span class="s1">&#39;&quot;instances&quot; or &quot;series&quot;&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_message</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">element</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span>
<span class="n">timestamp</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">):</span>
<span class="c1"># Check if the element is valid</span>
<span class="n">valid</span><span class="p">,</span> <span class="n">error_message</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">validate_element</span><span class="p">(</span><span class="n">element</span><span class="p">)</span>
<span class="k">if</span> <span class="n">valid</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">element</span><span class="p">,</span> <span class="n">window</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">:</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">item</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Return this when the input dict dose not meet the requirements</span>
<span class="n">out</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;result&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">error_message</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;input&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">yield</span> <span class="n">out</span>
<span class="k">def</span> <span class="nf">make_request</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># Sending Qido request to DICOM Api</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;project_id&#39;</span><span class="p">]</span>
<span class="n">region</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;region&#39;</span><span class="p">]</span>
<span class="n">dataset_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;dataset_id&#39;</span><span class="p">]</span>
<span class="n">dicom_store_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;dicom_store_id&#39;</span><span class="p">]</span>
<span class="n">search_type</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;search_type&#39;</span><span class="p">]</span>
<span class="n">params</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">&#39;params&#39;</span><span class="p">]</span> <span class="k">if</span> <span class="s1">&#39;params&#39;</span> <span class="ow">in</span> <span class="n">element</span> <span class="k">else</span> <span class="kc">None</span>
<span class="c1"># Call qido search http client</span>
<span class="n">result</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">qido_search</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span>
<span class="n">search_type</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">credential</span>
<span class="p">)</span>
<span class="n">out</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;result&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">result</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">status_code</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;input&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">200</span><span class="p">)</span>
<span class="k">return</span> <span class="n">out</span>
<span class="k">def</span> <span class="nf">process_buffer_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span>
<span class="c1"># Thread job runner - each thread makes a Qido search request</span>
<span class="n">value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">make_request</span><span class="p">(</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">windows</span> <span class="o">=</span> <span class="p">[</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">(</span>
<span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">windows</span><span class="o">=</span><span class="n">windows</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Create thread pool executor and process the buffered elements in paralllel</span>
<span class="n">executor</span> <span class="o">=</span> <span class="n">ThreadPoolExecutor</span><span class="p">(</span><span class="n">max_workers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">)</span>
<span class="n">futures</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process_buffer_element</span><span class="p">,</span> <span class="n">ele</span><span class="p">)</span> <span class="k">for</span> <span class="n">ele</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span>
<span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">as_completed</span><span class="p">(</span><span class="n">futures</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">f</span><span class="o">.</span><span class="n">result</span><span class="p">()</span>
<div class="viewcode-block" id="FormatToQido"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.FormatToQido">[docs]</a><span class="k">class</span> <span class="nc">FormatToQido</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A PTransform for converting pubsub messages into search input dict.</span>
<span class="sd"> Takes PCollection of string as input and returns a PCollection of dict as</span>
<span class="sd"> results. Note that some pubsub messages may not be from DICOM API, which</span>
<span class="sd"> will be recorded as failed conversions.</span>
<span class="sd"> INPUT:</span>
<span class="sd"> The input are normally strings from Pubsub topic:</span>
<span class="sd"> &quot;projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/</span>
<span class="sd"> dicomStores/DICOM_STORE_ID/dicomWeb/studies/STUDY_UID/</span>
<span class="sd"> series/SERIES_UID/instances/INSTANCE_UID&quot;</span>
<span class="sd"> OUTPUT:</span>
<span class="sd"> The output dict encodes results as well as error messages:</span>
<span class="sd"> {</span>
<span class="sd"> &#39;result&#39;: a dict representing instance level qido search request.</span>
<span class="sd"> &#39;success&#39;: boolean value telling whether the conversion is successful.</span>
<span class="sd"> &#39;input&#39;: input pubsub message string.</span>
<span class="sd"> }</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes FormatToQido.</span>
<span class="sd"> Args:</span>
<span class="sd"> credential: # type: Google credential object, if it is specified, the</span>
<span class="sd"> Http client will use it instead of the default one.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span>
<div class="viewcode-block" id="FormatToQido.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.FormatToQido.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ConvertStringToQido</span><span class="p">())</span></div></div>
<span class="k">class</span> <span class="nc">_ConvertStringToQido</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn for converting pubsub string to qido search parameters.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
<span class="c1"># Some constants for DICOM pubsub message</span>
<span class="n">NUM_PUBSUB_STR_ENTRIES</span> <span class="o">=</span> <span class="mi">15</span>
<span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span> <span class="o">=</span> <span class="mi">5</span>
<span class="n">NUM_TOTAL_PARAMETERS</span> <span class="o">=</span> <span class="mi">8</span>
<span class="n">INDEX_PROJECT_ID</span> <span class="o">=</span> <span class="mi">1</span>
<span class="n">INDEX_REGION</span> <span class="o">=</span> <span class="mi">3</span>
<span class="n">INDEX_DATASET_ID</span> <span class="o">=</span> <span class="mi">5</span>
<span class="n">INDEX_DICOMSTORE_ID</span> <span class="o">=</span> <span class="mi">7</span>
<span class="n">INDEX_STUDY_ID</span> <span class="o">=</span> <span class="mi">10</span>
<span class="n">INDEX_SERIE_ID</span> <span class="o">=</span> <span class="mi">12</span>
<span class="n">INDEX_INSTANCE_ID</span> <span class="o">=</span> <span class="mi">14</span>
<span class="n">entries</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;/&#39;</span><span class="p">)</span>
<span class="c1"># Output dict with error message, used when</span>
<span class="c1"># receiving invalid pubsub string.</span>
<span class="n">error_dict</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">error_dict</span><span class="p">[</span><span class="s1">&#39;result&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">error_dict</span><span class="p">[</span><span class="s1">&#39;input&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">error_dict</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">entries</span><span class="p">)</span> <span class="o">!=</span> <span class="n">NUM_PUBSUB_STR_ENTRIES</span><span class="p">:</span>
<span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span>
<span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span>
<span class="s1">&#39;projects&#39;</span><span class="p">,</span>
<span class="s1">&#39;locations&#39;</span><span class="p">,</span>
<span class="s1">&#39;datasets&#39;</span><span class="p">,</span>
<span class="s1">&#39;dicomStores&#39;</span><span class="p">,</span>
<span class="s1">&#39;dicomWeb&#39;</span><span class="p">,</span>
<span class="s1">&#39;studies&#39;</span><span class="p">,</span>
<span class="s1">&#39;series&#39;</span><span class="p">,</span>
<span class="s1">&#39;instances&#39;</span>
<span class="p">]</span>
<span class="c1"># Check if the required keys present and</span>
<span class="c1"># the positions of those keys are correct</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span><span class="p">):</span>
<span class="k">if</span> <span class="n">required_keys</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">!=</span> <span class="n">entries</span><span class="p">[</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="p">]:</span>
<span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span><span class="p">,</span> <span class="n">NUM_TOTAL_PARAMETERS</span><span class="p">):</span>
<span class="k">if</span> <span class="n">required_keys</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">!=</span> <span class="n">entries</span><span class="p">[</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span> <span class="o">-</span> <span class="mi">1</span><span class="p">]:</span>
<span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span>
<span class="c1"># Compose dicom webpath parameters for qido search</span>
<span class="n">qido_dict</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;project_id&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_PROJECT_ID</span><span class="p">]</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;region&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_REGION</span><span class="p">]</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;dataset_id&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_DATASET_ID</span><span class="p">]</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;dicom_store_id&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_DICOMSTORE_ID</span><span class="p">]</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;search_type&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s1">&#39;instances&#39;</span>
<span class="c1"># Compose instance level params for qido search</span>
<span class="n">params</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">params</span><span class="p">[</span><span class="s1">&#39;StudyInstanceUID&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_STUDY_ID</span><span class="p">]</span>
<span class="n">params</span><span class="p">[</span><span class="s1">&#39;SeriesInstanceUID&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_SERIE_ID</span><span class="p">]</span>
<span class="n">params</span><span class="p">[</span><span class="s1">&#39;SOPInstanceUID&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_INSTANCE_ID</span><span class="p">]</span>
<span class="n">qido_dict</span><span class="p">[</span><span class="s1">&#39;params&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">params</span>
<span class="n">out</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;result&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">qido_dict</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;input&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="p">[</span><span class="n">out</span><span class="p">]</span>
<div class="viewcode-block" id="UploadToDicomStore"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.UploadToDicomStore">[docs]</a><span class="k">class</span> <span class="nc">UploadToDicomStore</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A PTransform for storing instances to a DICOM store.</span>
<span class="sd"> Takes PCollection of byte[] as input and return a PCollection of dict as</span>
<span class="sd"> results. The inputs are normally DICOM file in bytes or str filename.</span>
<span class="sd"> INPUT:</span>
<span class="sd"> This PTransform supports two types of input:</span>
<span class="sd"> 1. Byte[]: representing dicom file.</span>
<span class="sd"> 2. Fileio object: stream file object.</span>
<span class="sd"> OUTPUT:</span>
<span class="sd"> The output dict encodes status as well as error messages:</span>
<span class="sd"> {</span>
<span class="sd"> &#39;success&#39;: boolean value telling whether the store is successful.</span>
<span class="sd"> &#39;input&#39;: undeliverable data. Exactly the same as the input,</span>
<span class="sd"> only set if the operation is failed.</span>
<span class="sd"> &#39;status&#39;: status code from the server, used as error messages.</span>
<span class="sd"> }</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">destination_dict</span><span class="p">,</span>
<span class="n">input_type</span><span class="p">,</span>
<span class="n">buffer_size</span><span class="o">=</span><span class="mi">8</span><span class="p">,</span>
<span class="n">max_workers</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span>
<span class="n">client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Initializes UploadToDicomStore.</span>
<span class="sd"> Args:</span>
<span class="sd"> destination_dict: # type: python dict, encodes DICOM endpoint information:</span>
<span class="sd"> {</span>
<span class="sd"> &#39;project_id&#39;: str,</span>
<span class="sd"> &#39;region&#39;: str,</span>
<span class="sd"> &#39;dataset_id&#39;: str,</span>
<span class="sd"> &#39;dicom_store_id&#39;: str,</span>
<span class="sd"> }</span>
<span class="sd"> Key-value pairs:</span>
<span class="sd"> * project_id: Id of the project in which DICOM store locates. (Required)</span>
<span class="sd"> * region: Region where the DICOM store resides. (Required)</span>
<span class="sd"> * dataset_id: Id of the dataset where DICOM store belongs to. (Required)</span>
<span class="sd"> * dicom_store_id: Id of the dicom store. (Required)</span>
<span class="sd"> input_type: # type: string, could only be &#39;bytes&#39; or &#39;fileio&#39;</span>
<span class="sd"> buffer_size: # type: Int. Size of the request buffer.</span>
<span class="sd"> max_workers: # type: Int. Maximum number of threads a worker can</span>
<span class="sd"> create. If it is set to one, all the request will be processed</span>
<span class="sd"> sequentially in a worker.</span>
<span class="sd"> client: # type: object. If it is specified, all the Api calls will</span>
<span class="sd"> made by this client instead of the default one (DicomApiHttpClient).</span>
<span class="sd"> credential: # type: Google credential object, if it is specified, the</span>
<span class="sd"> Http client will use it instead of the default one.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span> <span class="o">=</span> <span class="n">destination_dict</span>
<span class="c1"># input_type pre-check</span>
<span class="k">if</span> <span class="n">input_type</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">[</span><span class="s1">&#39;bytes&#39;</span><span class="p">,</span> <span class="s1">&#39;fileio&#39;</span><span class="p">]:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;input_type could only be &#39;bytes&#39; or &#39;fileio&#39;&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">=</span> <span class="n">input_type</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span>
<div class="viewcode-block" id="UploadToDicomStore.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.UploadToDicomStore.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
<span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
<span class="n">_StoreInstance</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">input_type</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span><span class="p">))</span></div></div>
<span class="k">class</span> <span class="nc">_StoreInstance</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;A DoFn read or fetch dicom files then push it to a dicom store.&quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">destination_dict</span><span class="p">,</span>
<span class="n">input_type</span><span class="p">,</span>
<span class="n">buffer_size</span><span class="p">,</span>
<span class="n">max_workers</span><span class="p">,</span>
<span class="n">client</span><span class="p">,</span>
<span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># pre-check destination dict</span>
<span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;project_id&#39;</span><span class="p">,</span> <span class="s1">&#39;region&#39;</span><span class="p">,</span> <span class="s1">&#39;dataset_id&#39;</span><span class="p">,</span> <span class="s1">&#39;dicom_store_id&#39;</span><span class="p">]</span>
<span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">required_keys</span><span class="p">:</span>
<span class="k">if</span> <span class="n">key</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">destination_dict</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">&#39;Must have </span><span class="si">%s</span><span class="s1"> in the dict.&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">key</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span> <span class="o">=</span> <span class="n">destination_dict</span>
<span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">=</span> <span class="n">input_type</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span>
<span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span>
<span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">item</span>
<span class="k">def</span> <span class="nf">process</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">element</span><span class="p">,</span>
<span class="n">window</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span>
<span class="n">timestamp</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">element</span><span class="p">,</span> <span class="n">window</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">))</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">:</span>
<span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span>
<span class="k">yield</span> <span class="n">item</span>
<span class="k">def</span> <span class="nf">make_request</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">):</span>
<span class="c1"># Send file to DICOM store and records the results.</span>
<span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">&#39;project_id&#39;</span><span class="p">]</span>
<span class="n">region</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">&#39;region&#39;</span><span class="p">]</span>
<span class="n">dataset_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">&#39;dataset_id&#39;</span><span class="p">]</span>
<span class="n">dicom_store_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">&#39;dicom_store_id&#39;</span><span class="p">]</span>
<span class="c1"># Feed the dicom file into store client</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">:</span>
<span class="n">_</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">dicomweb_store_instance</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">_</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="n">DicomApiHttpClient</span><span class="p">()</span><span class="o">.</span><span class="n">dicomweb_store_instance</span><span class="p">(</span>
<span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">credential</span>
<span class="p">)</span>
<span class="n">out</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">status_code</span>
<span class="n">out</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">200</span><span class="p">)</span>
<span class="k">return</span> <span class="n">out</span>
<span class="k">def</span> <span class="nf">read_dicom_file</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span>
<span class="c1"># Read the file based on different input. If the read fails ,return</span>
<span class="c1"># an error dict which records input and error messages.</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">==</span> <span class="s1">&#39;fileio&#39;</span><span class="p">:</span>
<span class="n">f</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="o">.</span><span class="n">open</span><span class="p">()</span>
<span class="n">data</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="n">f</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
<span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="n">data</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="n">buffer_element</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">error_message</span><span class="p">:</span>
<span class="n">error_out</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">error_out</span><span class="p">[</span><span class="s1">&#39;status&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">error_message</span>
<span class="n">error_out</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_out</span>
<span class="k">def</span> <span class="nf">process_buffer_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span>
<span class="c1"># Thread job runner - each thread stores a DICOM file</span>
<span class="n">success</span><span class="p">,</span> <span class="n">read_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">read_dicom_file</span><span class="p">(</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="n">windows</span> <span class="o">=</span> <span class="p">[</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span>
<span class="n">timestamp</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span>
<span class="n">value</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">success</span><span class="p">:</span>
<span class="n">value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">make_request</span><span class="p">(</span><span class="n">read_result</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">value</span> <span class="o">=</span> <span class="n">read_result</span>
<span class="c1"># save the undeliverable data</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">value</span><span class="p">[</span><span class="s1">&#39;success&#39;</span><span class="p">]:</span>
<span class="n">value</span><span class="p">[</span><span class="s1">&#39;input&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">(</span>
<span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">windows</span><span class="o">=</span><span class="n">windows</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Create thread pool executor and process the buffered elements in paralllel</span>
<span class="n">executor</span> <span class="o">=</span> <span class="n">ThreadPoolExecutor</span><span class="p">(</span><span class="n">max_workers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">)</span>
<span class="n">futures</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process_buffer_element</span><span class="p">,</span> <span class="n">ele</span><span class="p">)</span> <span class="k">for</span> <span class="n">ele</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span>
<span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">as_completed</span><span class="p">(</span><span class="n">futures</span><span class="p">):</span>
<span class="k">yield</span> <span class="n">f</span><span class="o">.</span><span class="n">result</span><span class="p">()</span>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
&copy; Copyright
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>