| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>apache_beam.io.gcp.dicomio — Apache Beam 2.38.0 documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../_static/language_data.js"></script> |
| <script async="async" type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/latest.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| |
| <script type="text/javascript" src="../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../search.html" /> |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../index.html" class="icon icon-home"> Apache Beam |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 2.38.0 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.coders.html">apache_beam.coders package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.dataframe.html">apache_beam.dataframe package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.io.html">apache_beam.io package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.metrics.html">apache_beam.metrics package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.ml.html">apache_beam.ml package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.options.html">apache_beam.options package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.portability.html">apache_beam.portability package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.runners.html">apache_beam.runners package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.transforms.html">apache_beam.transforms package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.typehints.html">apache_beam.typehints package</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.utils.html">apache_beam.utils package</a></li> |
| </ul> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.error.html">apache_beam.error module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pipeline.html">apache_beam.pipeline module</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../apache_beam.pvalue.html">apache_beam.pvalue module</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../index.html">Apache Beam</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../index.html">Module code</a> »</li> |
| |
| <li>apache_beam.io.gcp.dicomio</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for apache_beam.io.gcp.dicomio</h1><div class="highlight"><pre> |
| <span></span><span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span> |
| <span class="c1"># contributor license agreements. See the NOTICE file distributed with</span> |
| <span class="c1"># this work for additional information regarding copyright ownership.</span> |
| <span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span> |
| <span class="c1"># (the "License"); you may not use this file except in compliance with</span> |
| <span class="c1"># the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing, software</span> |
| <span class="c1"># distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="c1"># See the License for the specific language governing permissions and</span> |
| <span class="c1"># limitations under the License.</span> |
| <span class="c1">#</span> |
| |
| <span class="sd">"""DICOM IO connector</span> |
| <span class="sd">This module implements several tools to facilitate the interaction between</span> |
| <span class="sd">a Google Cloud Healthcare DICOM store and a Beam pipeline.</span> |
| |
| <span class="sd">For more details on DICOM store and API:</span> |
| <span class="sd">https://cloud.google.com/healthcare/docs/how-tos/dicom</span> |
| |
| <span class="sd">The DICOM IO connector can be used to search metadata or write DICOM files</span> |
| <span class="sd">to DICOM store.</span> |
| |
| <span class="sd">When used together with Google Pubsub message connector, the</span> |
| <span class="sd">`FormatToQido` PTransform implemented in this module can be used</span> |
| <span class="sd">to convert Pubsub messages to search requests.</span> |
| |
| <span class="sd">Since Traceability is crucial for healthcare</span> |
| <span class="sd">API users, every input or error message will be recorded in the output of</span> |
| <span class="sd">the DICOM IO connector. As a result, every PTransform in this module will</span> |
| <span class="sd">return a PCollection of dict that encodes results and detailed error messages.</span> |
| |
| <span class="sd">Search instance's metadata (QIDO request)</span> |
| <span class="sd">===================================================</span> |
| <span class="sd">DicomSearch() wraps the QIDO request client and supports 3 levels of search.</span> |
| <span class="sd">Users should specify the level by setting the 'search_type' entry in the input</span> |
| <span class="sd">dict. They can also refine the search by adding tags to filter the results using</span> |
| <span class="sd">the 'params' entry. Here is a sample usage:</span> |
| |
| <span class="sd"> with Pipeline() as p:</span> |
| <span class="sd"> input_dict = p | beam.Create(</span> |
| <span class="sd"> [{'project_id': 'abc123', 'type': 'instances',...},</span> |
| <span class="sd"> {'project_id': 'dicom_go', 'type': 'series',...}])</span> |
| |
| <span class="sd"> results = input_dict | io.gcp.DicomSearch()</span> |
| <span class="sd"> results | 'print successful search' >> beam.Map(</span> |
| <span class="sd"> lambda x: print(x['result'] if x['success'] else None))</span> |
| |
| <span class="sd"> results | 'print failed search' >> beam.Map(</span> |
| <span class="sd"> lambda x: print(x['result'] if not x['success'] else None))</span> |
| |
| <span class="sd">In the example above, successful qido search results and error messages for</span> |
| <span class="sd">failed requests are printed. When used in real life, user can choose to filter</span> |
| <span class="sd">those data and output them to wherever they want.</span> |
| |
| <span class="sd">Convert DICOM Pubsub message to Qido search request</span> |
| <span class="sd">===================================================</span> |
| <span class="sd">Healthcare API users might read messages from Pubsub to monitor the store</span> |
| <span class="sd">operations (e.g. new file) in a DICOM storage. Pubsub message encode</span> |
| <span class="sd">DICOM as a web store path as well as instance ids. If users are interested in</span> |
| <span class="sd">getting new instance's metadata, they can use the `FormatToQido` transform</span> |
| <span class="sd">to convert the message into Qido Search dict then use the `DicomSearch`</span> |
| <span class="sd">transform. Here is a sample usage:</span> |
| |
| <span class="sd"> pipeline_options = PipelineOptions()</span> |
| <span class="sd"> pipeline_options.view_as(StandardOptions).streaming = True</span> |
| <span class="sd"> p = beam.Pipeline(options=pipeline_options)</span> |
| <span class="sd"> pubsub = p | beam.io.ReadStringFromPubsub(subscription='a_dicom_store')</span> |
| <span class="sd"> results = pubsub | FormatToQido()</span> |
| <span class="sd"> success = results | 'filter message' >> beam.Filter(lambda x: x['success'])</span> |
| <span class="sd"> qido_dict = success | 'get qido request' >> beam.Map(lambda x: x['result'])</span> |
| <span class="sd"> metadata = qido_dict | DicomSearch()</span> |
| |
| <span class="sd">In the example above, the pipeline is listening to a pubsub topic and waiting</span> |
| <span class="sd">for messages from DICOM API. When a new DICOM file comes into the storage, the</span> |
| <span class="sd">pipeline will receive a pubsub message, convert it to a Qido request dict and</span> |
| <span class="sd">feed it to DicomSearch() PTransform. As a result, users can get the metadata for</span> |
| <span class="sd">every new DICOM file. Note that not every pubsub message received is from DICOM</span> |
| <span class="sd">API, so we to filter the results first.</span> |
| |
| <span class="sd">Store a DICOM file in a DICOM storage</span> |
| <span class="sd">===================================================</span> |
| <span class="sd">UploadToDicomStore() wraps store request API and users can use it to send a</span> |
| <span class="sd">DICOM file to a DICOM store. It supports two types of input: 1.file data in</span> |
| <span class="sd">byte[] 2.fileio object. Users should set the 'input_type' when initialzing</span> |
| <span class="sd">this PTransform. Here are the examples:</span> |
| |
| <span class="sd"> with Pipeline() as p:</span> |
| <span class="sd"> input_dict = {'project_id': 'abc123', 'type': 'instances',...}</span> |
| <span class="sd"> path = "gcs://bucketname/something/a.dcm"</span> |
| <span class="sd"> match = p | fileio.MatchFiles(path)</span> |
| <span class="sd"> fileio_obj = match | fileio.ReadAll()</span> |
| <span class="sd"> results = fileio_obj | UploadToDicomStore(input_dict, 'fileio')</span> |
| |
| <span class="sd"> with Pipeline() as p:</span> |
| <span class="sd"> input_dict = {'project_id': 'abc123', 'type': 'instances',...}</span> |
| <span class="sd"> f = open("abc.dcm", "rb")</span> |
| <span class="sd"> dcm_file = f.read()</span> |
| <span class="sd"> byte_file = p | 'create byte file' >> beam.Create([dcm_file])</span> |
| <span class="sd"> results = byte_file | UploadToDicomStore(input_dict, 'bytes')</span> |
| |
| <span class="sd">The first example uses a PCollection of fileio objects as input.</span> |
| <span class="sd">UploadToDicomStore will read DICOM files from the objects and send them</span> |
| <span class="sd">to a DICOM storage.</span> |
| <span class="sd">The second example uses a PCollection of byte[] as input. UploadToDicomStore</span> |
| <span class="sd">will directly send those DICOM files to a DICOM storage.</span> |
| <span class="sd">Users can also get the operation results in the output PCollection if they want</span> |
| <span class="sd">to handle the failed store requests.</span> |
| <span class="sd">"""</span> |
| |
| <span class="c1"># pytype: skip-file</span> |
| <span class="kn">from</span> <span class="nn">concurrent.futures</span> <span class="kn">import</span> <span class="n">ThreadPoolExecutor</span> |
| <span class="kn">from</span> <span class="nn">concurrent.futures</span> <span class="kn">import</span> <span class="n">as_completed</span> |
| |
| <span class="kn">import</span> <span class="nn">apache_beam</span> <span class="k">as</span> <span class="nn">beam</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.io.gcp.dicomclient</span> <span class="kn">import</span> <span class="n">DicomApiHttpClient</span> |
| <span class="kn">from</span> <span class="nn">apache_beam.transforms</span> <span class="kn">import</span> <span class="n">PTransform</span> |
| |
| |
| <div class="viewcode-block" id="DicomSearch"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.DicomSearch">[docs]</a><span class="k">class</span> <span class="nc">DicomSearch</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="sd">"""A PTransform used for retrieving DICOM instance metadata from Google</span> |
| <span class="sd"> Cloud DICOM store. It takes a PCollection of dicts as input and return</span> |
| <span class="sd"> a PCollection of dict as results:</span> |
| <span class="sd"> INPUT:</span> |
| <span class="sd"> The input dict represents DICOM web path parameters, which has the following</span> |
| <span class="sd"> string keys and values:</span> |
| <span class="sd"> {</span> |
| <span class="sd"> 'project_id': str,</span> |
| <span class="sd"> 'region': str,</span> |
| <span class="sd"> 'dataset_id': str,</span> |
| <span class="sd"> 'dicom_store_id': str,</span> |
| <span class="sd"> 'search_type': str,</span> |
| <span class="sd"> 'params': dict(str,str) (Optional),</span> |
| <span class="sd"> }</span> |
| |
| <span class="sd"> Key-value pairs:</span> |
| <span class="sd"> project_id: Id of the project in which the DICOM store is</span> |
| <span class="sd"> located. (Required)</span> |
| <span class="sd"> region: Region where the DICOM store resides. (Required)</span> |
| <span class="sd"> dataset_id: Id of the dataset where DICOM store belongs to. (Required)</span> |
| <span class="sd"> dicom_store_id: Id of the dicom store. (Required)</span> |
| <span class="sd"> search_type: Which type of search it is, could only be one of the three</span> |
| <span class="sd"> values: 'instances', 'series', or 'studies'. (Required)</span> |
| <span class="sd"> params: A dict of str:str pairs used to refine QIDO search. (Optional)</span> |
| <span class="sd"> Supported tags in three categories:</span> |
| <span class="sd"> 1.Studies:</span> |
| <span class="sd"> * StudyInstanceUID,</span> |
| <span class="sd"> * PatientName,</span> |
| <span class="sd"> * PatientID,</span> |
| <span class="sd"> * AccessionNumber,</span> |
| <span class="sd"> * ReferringPhysicianName,</span> |
| <span class="sd"> * StudyDate,</span> |
| <span class="sd"> 2.Series: all study level search terms and</span> |
| <span class="sd"> * SeriesInstanceUID,</span> |
| <span class="sd"> * Modality,</span> |
| <span class="sd"> 3.Instances: all study/series level search terms and</span> |
| <span class="sd"> * SOPInstanceUID,</span> |
| |
| <span class="sd"> e.g. {"StudyInstanceUID":"1","SeriesInstanceUID":"2"}</span> |
| |
| <span class="sd"> OUTPUT:</span> |
| <span class="sd"> The output dict wraps results as well as error messages:</span> |
| <span class="sd"> {</span> |
| <span class="sd"> 'result': a list of dicts in JSON style.</span> |
| <span class="sd"> 'success': boolean value telling whether the operation is successful.</span> |
| <span class="sd"> 'input': detail ids and dicomweb path for this retrieval.</span> |
| <span class="sd"> 'status': status code from the server, used as error message.</span> |
| <span class="sd"> }</span> |
| |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">buffer_size</span><span class="o">=</span><span class="mi">8</span><span class="p">,</span> <span class="n">max_workers</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span> <span class="n">client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Initializes DicomSearch.</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> buffer_size: # type: Int. Size of the request buffer.</span> |
| <span class="sd"> max_workers: # type: Int. Maximum number of threads a worker can</span> |
| <span class="sd"> create. If it is set to one, all the request will be processed</span> |
| <span class="sd"> sequentially in a worker.</span> |
| <span class="sd"> client: # type: object. If it is specified, all the Api calls will</span> |
| <span class="sd"> made by this client instead of the default one (DicomApiHttpClient).</span> |
| <span class="sd"> credential: # type: Google credential object, if it is specified, the</span> |
| <span class="sd"> Http client will use it to create sessions instead of the default.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> <span class="ow">or</span> <span class="n">DicomApiHttpClient</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span> |
| |
| <div class="viewcode-block" id="DicomSearch.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.DicomSearch.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_QidoReadFn</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">credential</span><span class="p">))</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_QidoReadFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""A DoFn for executing every qido query request."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_size</span><span class="p">,</span> <span class="n">max_workers</span><span class="p">,</span> <span class="n">client</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="n">item</span> |
| |
| <span class="k">def</span> <span class="nf">validate_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="c1"># Check if all required keys present.</span> |
| <span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'project_id'</span><span class="p">,</span> <span class="s1">'region'</span><span class="p">,</span> <span class="s1">'dataset_id'</span><span class="p">,</span> <span class="s1">'dicom_store_id'</span><span class="p">,</span> <span class="s1">'search_type'</span> |
| <span class="p">]</span> |
| |
| <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">required_keys</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">key</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">element</span><span class="p">:</span> |
| <span class="n">error_message</span> <span class="o">=</span> <span class="s1">'Must have </span><span class="si">%s</span><span class="s1"> in the dict.'</span> <span class="o">%</span> <span class="p">(</span><span class="n">key</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_message</span> |
| |
| <span class="c1"># Check if return type is correct.</span> |
| <span class="k">if</span> <span class="n">element</span><span class="p">[</span><span class="s1">'search_type'</span><span class="p">]</span> <span class="ow">in</span> <span class="p">[</span><span class="s1">'instances'</span><span class="p">,</span> <span class="s2">"studies"</span><span class="p">,</span> <span class="s2">"series"</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="kc">None</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">error_message</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s1">'Search type can only be "studies", '</span> |
| <span class="s1">'"instances" or "series"'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_message</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">element</span><span class="p">,</span> |
| <span class="n">window</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span> |
| <span class="n">timestamp</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">):</span> |
| <span class="c1"># Check if the element is valid</span> |
| <span class="n">valid</span><span class="p">,</span> <span class="n">error_message</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">validate_element</span><span class="p">(</span><span class="n">element</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">valid</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">element</span><span class="p">,</span> <span class="n">window</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">))</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">)</span> <span class="o">>=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="n">item</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Return this when the input dict dose not meet the requirements</span> |
| <span class="n">out</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'result'</span><span class="p">]</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'status'</span><span class="p">]</span> <span class="o">=</span> <span class="n">error_message</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'input'</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">yield</span> <span class="n">out</span> |
| |
| <span class="k">def</span> <span class="nf">make_request</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="c1"># Sending Qido request to DICOM Api</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'project_id'</span><span class="p">]</span> |
| <span class="n">region</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'region'</span><span class="p">]</span> |
| <span class="n">dataset_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'dataset_id'</span><span class="p">]</span> |
| <span class="n">dicom_store_id</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'dicom_store_id'</span><span class="p">]</span> |
| <span class="n">search_type</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'search_type'</span><span class="p">]</span> |
| <span class="n">params</span> <span class="o">=</span> <span class="n">element</span><span class="p">[</span><span class="s1">'params'</span><span class="p">]</span> <span class="k">if</span> <span class="s1">'params'</span> <span class="ow">in</span> <span class="n">element</span> <span class="k">else</span> <span class="kc">None</span> |
| |
| <span class="c1"># Call qido search http client</span> |
| <span class="n">result</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">qido_search</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span> |
| <span class="n">search_type</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> |
| <span class="p">)</span> |
| |
| <span class="n">out</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'result'</span><span class="p">]</span> <span class="o">=</span> <span class="n">result</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'status'</span><span class="p">]</span> <span class="o">=</span> <span class="n">status_code</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'input'</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">200</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">out</span> |
| |
| <span class="k">def</span> <span class="nf">process_buffer_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span> |
| <span class="c1"># Thread job runner - each thread makes a Qido search request</span> |
| <span class="n">value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">make_request</span><span class="p">(</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="n">windows</span> <span class="o">=</span> <span class="p">[</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> |
| <span class="n">timestamp</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">(</span> |
| <span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">windows</span><span class="o">=</span><span class="n">windows</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># Create thread pool executor and process the buffered elements in paralllel</span> |
| <span class="n">executor</span> <span class="o">=</span> <span class="n">ThreadPoolExecutor</span><span class="p">(</span><span class="n">max_workers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">)</span> |
| <span class="n">futures</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process_buffer_element</span><span class="p">,</span> <span class="n">ele</span><span class="p">)</span> <span class="k">for</span> <span class="n">ele</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> |
| <span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">as_completed</span><span class="p">(</span><span class="n">futures</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">f</span><span class="o">.</span><span class="n">result</span><span class="p">()</span> |
| |
| |
| <div class="viewcode-block" id="FormatToQido"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.FormatToQido">[docs]</a><span class="k">class</span> <span class="nc">FormatToQido</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="sd">"""A PTransform for converting pubsub messages into search input dict.</span> |
| <span class="sd"> Takes PCollection of string as input and returns a PCollection of dict as</span> |
| <span class="sd"> results. Note that some pubsub messages may not be from DICOM API, which</span> |
| <span class="sd"> will be recorded as failed conversions.</span> |
| <span class="sd"> INPUT:</span> |
| <span class="sd"> The input are normally strings from Pubsub topic:</span> |
| <span class="sd"> "projects/PROJECT_ID/locations/LOCATION/datasets/DATASET_ID/</span> |
| <span class="sd"> dicomStores/DICOM_STORE_ID/dicomWeb/studies/STUDY_UID/</span> |
| <span class="sd"> series/SERIES_UID/instances/INSTANCE_UID"</span> |
| |
| <span class="sd"> OUTPUT:</span> |
| <span class="sd"> The output dict encodes results as well as error messages:</span> |
| <span class="sd"> {</span> |
| <span class="sd"> 'result': a dict representing instance level qido search request.</span> |
| <span class="sd"> 'success': boolean value telling whether the conversion is successful.</span> |
| <span class="sd"> 'input': input pubsub message string.</span> |
| <span class="sd"> }</span> |
| |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Initializes FormatToQido.</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> credential: # type: Google credential object, if it is specified, the</span> |
| <span class="sd"> Http client will use it instead of the default one.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span> |
| |
| <div class="viewcode-block" id="FormatToQido.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.FormatToQido.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">_ConvertStringToQido</span><span class="p">())</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_ConvertStringToQido</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""A DoFn for converting pubsub string to qido search parameters."""</span> |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> |
| <span class="c1"># Some constants for DICOM pubsub message</span> |
| <span class="n">NUM_PUBSUB_STR_ENTRIES</span> <span class="o">=</span> <span class="mi">15</span> |
| <span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span> <span class="o">=</span> <span class="mi">5</span> |
| <span class="n">NUM_TOTAL_PARAMETERS</span> <span class="o">=</span> <span class="mi">8</span> |
| <span class="n">INDEX_PROJECT_ID</span> <span class="o">=</span> <span class="mi">1</span> |
| <span class="n">INDEX_REGION</span> <span class="o">=</span> <span class="mi">3</span> |
| <span class="n">INDEX_DATASET_ID</span> <span class="o">=</span> <span class="mi">5</span> |
| <span class="n">INDEX_DICOMSTORE_ID</span> <span class="o">=</span> <span class="mi">7</span> |
| <span class="n">INDEX_STUDY_ID</span> <span class="o">=</span> <span class="mi">10</span> |
| <span class="n">INDEX_SERIE_ID</span> <span class="o">=</span> <span class="mi">12</span> |
| <span class="n">INDEX_INSTANCE_ID</span> <span class="o">=</span> <span class="mi">14</span> |
| |
| <span class="n">entries</span> <span class="o">=</span> <span class="n">element</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'/'</span><span class="p">)</span> |
| |
| <span class="c1"># Output dict with error message, used when</span> |
| <span class="c1"># receiving invalid pubsub string.</span> |
| <span class="n">error_dict</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">error_dict</span><span class="p">[</span><span class="s1">'result'</span><span class="p">]</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">error_dict</span><span class="p">[</span><span class="s1">'input'</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span> |
| <span class="n">error_dict</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">entries</span><span class="p">)</span> <span class="o">!=</span> <span class="n">NUM_PUBSUB_STR_ENTRIES</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span> |
| |
| <span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="s1">'projects'</span><span class="p">,</span> |
| <span class="s1">'locations'</span><span class="p">,</span> |
| <span class="s1">'datasets'</span><span class="p">,</span> |
| <span class="s1">'dicomStores'</span><span class="p">,</span> |
| <span class="s1">'dicomWeb'</span><span class="p">,</span> |
| <span class="s1">'studies'</span><span class="p">,</span> |
| <span class="s1">'series'</span><span class="p">,</span> |
| <span class="s1">'instances'</span> |
| <span class="p">]</span> |
| |
| <span class="c1"># Check if the required keys present and</span> |
| <span class="c1"># the positions of those keys are correct</span> |
| <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">required_keys</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">!=</span> <span class="n">entries</span><span class="p">[</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span> |
| <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="n">NUM_DICOM_WEBPATH_PARAMETERS</span><span class="p">,</span> <span class="n">NUM_TOTAL_PARAMETERS</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">required_keys</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">!=</span> <span class="n">entries</span><span class="p">[</span><span class="n">i</span> <span class="o">*</span> <span class="mi">2</span> <span class="o">-</span> <span class="mi">1</span><span class="p">]:</span> |
| <span class="k">return</span> <span class="p">[</span><span class="n">error_dict</span><span class="p">]</span> |
| |
| <span class="c1"># Compose dicom webpath parameters for qido search</span> |
| <span class="n">qido_dict</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'project_id'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_PROJECT_ID</span><span class="p">]</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'region'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_REGION</span><span class="p">]</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'dataset_id'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_DATASET_ID</span><span class="p">]</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'dicom_store_id'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_DICOMSTORE_ID</span><span class="p">]</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'search_type'</span><span class="p">]</span> <span class="o">=</span> <span class="s1">'instances'</span> |
| |
| <span class="c1"># Compose instance level params for qido search</span> |
| <span class="n">params</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">params</span><span class="p">[</span><span class="s1">'StudyInstanceUID'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_STUDY_ID</span><span class="p">]</span> |
| <span class="n">params</span><span class="p">[</span><span class="s1">'SeriesInstanceUID'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_SERIE_ID</span><span class="p">]</span> |
| <span class="n">params</span><span class="p">[</span><span class="s1">'SOPInstanceUID'</span><span class="p">]</span> <span class="o">=</span> <span class="n">entries</span><span class="p">[</span><span class="n">INDEX_INSTANCE_ID</span><span class="p">]</span> |
| <span class="n">qido_dict</span><span class="p">[</span><span class="s1">'params'</span><span class="p">]</span> <span class="o">=</span> <span class="n">params</span> |
| |
| <span class="n">out</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'result'</span><span class="p">]</span> <span class="o">=</span> <span class="n">qido_dict</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'input'</span><span class="p">]</span> <span class="o">=</span> <span class="n">element</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="k">return</span> <span class="p">[</span><span class="n">out</span><span class="p">]</span> |
| |
| |
| <div class="viewcode-block" id="UploadToDicomStore"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.UploadToDicomStore">[docs]</a><span class="k">class</span> <span class="nc">UploadToDicomStore</span><span class="p">(</span><span class="n">PTransform</span><span class="p">):</span> |
| <span class="sd">"""A PTransform for storing instances to a DICOM store.</span> |
| <span class="sd"> Takes PCollection of byte[] as input and return a PCollection of dict as</span> |
| <span class="sd"> results. The inputs are normally DICOM file in bytes or str filename.</span> |
| <span class="sd"> INPUT:</span> |
| <span class="sd"> This PTransform supports two types of input:</span> |
| <span class="sd"> 1. Byte[]: representing dicom file.</span> |
| <span class="sd"> 2. Fileio object: stream file object.</span> |
| |
| <span class="sd"> OUTPUT:</span> |
| <span class="sd"> The output dict encodes status as well as error messages:</span> |
| <span class="sd"> {</span> |
| <span class="sd"> 'success': boolean value telling whether the store is successful.</span> |
| <span class="sd"> 'input': undeliverable data. Exactly the same as the input,</span> |
| <span class="sd"> only set if the operation is failed.</span> |
| <span class="sd"> 'status': status code from the server, used as error messages.</span> |
| <span class="sd"> }</span> |
| |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">destination_dict</span><span class="p">,</span> |
| <span class="n">input_type</span><span class="p">,</span> |
| <span class="n">buffer_size</span><span class="o">=</span><span class="mi">8</span><span class="p">,</span> |
| <span class="n">max_workers</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span> |
| <span class="n">client</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Initializes UploadToDicomStore.</span> |
| <span class="sd"> Args:</span> |
| <span class="sd"> destination_dict: # type: python dict, encodes DICOM endpoint information:</span> |
| <span class="sd"> {</span> |
| <span class="sd"> 'project_id': str,</span> |
| <span class="sd"> 'region': str,</span> |
| <span class="sd"> 'dataset_id': str,</span> |
| <span class="sd"> 'dicom_store_id': str,</span> |
| <span class="sd"> }</span> |
| |
| <span class="sd"> Key-value pairs:</span> |
| <span class="sd"> * project_id: Id of the project in which DICOM store locates. (Required)</span> |
| <span class="sd"> * region: Region where the DICOM store resides. (Required)</span> |
| <span class="sd"> * dataset_id: Id of the dataset where DICOM store belongs to. (Required)</span> |
| <span class="sd"> * dicom_store_id: Id of the dicom store. (Required)</span> |
| |
| <span class="sd"> input_type: # type: string, could only be 'bytes' or 'fileio'</span> |
| <span class="sd"> buffer_size: # type: Int. Size of the request buffer.</span> |
| <span class="sd"> max_workers: # type: Int. Maximum number of threads a worker can</span> |
| <span class="sd"> create. If it is set to one, all the request will be processed</span> |
| <span class="sd"> sequentially in a worker.</span> |
| <span class="sd"> client: # type: object. If it is specified, all the Api calls will</span> |
| <span class="sd"> made by this client instead of the default one (DicomApiHttpClient).</span> |
| <span class="sd"> credential: # type: Google credential object, if it is specified, the</span> |
| <span class="sd"> Http client will use it instead of the default one.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span> <span class="o">=</span> <span class="n">destination_dict</span> |
| <span class="c1"># input_type pre-check</span> |
| <span class="k">if</span> <span class="n">input_type</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">[</span><span class="s1">'bytes'</span><span class="p">,</span> <span class="s1">'fileio'</span><span class="p">]:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">"input_type could only be 'bytes' or 'fileio'"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">=</span> <span class="n">input_type</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span> |
| |
| <div class="viewcode-block" id="UploadToDicomStore.expand"><a class="viewcode-back" href="../../../../apache_beam.io.gcp.dicomio.html#apache_beam.io.gcp.dicomio.UploadToDicomStore.expand">[docs]</a> <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> |
| <span class="k">return</span> <span class="n">pcoll</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span> |
| <span class="n">_StoreInstance</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">input_type</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span><span class="p">))</span></div></div> |
| |
| |
| <span class="k">class</span> <span class="nc">_StoreInstance</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> |
| <span class="sd">"""A DoFn read or fetch dicom files then push it to a dicom store."""</span> |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">destination_dict</span><span class="p">,</span> |
| <span class="n">input_type</span><span class="p">,</span> |
| <span class="n">buffer_size</span><span class="p">,</span> |
| <span class="n">max_workers</span><span class="p">,</span> |
| <span class="n">client</span><span class="p">,</span> |
| <span class="n">credential</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># pre-check destination dict</span> |
| <span class="n">required_keys</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'project_id'</span><span class="p">,</span> <span class="s1">'region'</span><span class="p">,</span> <span class="s1">'dataset_id'</span><span class="p">,</span> <span class="s1">'dicom_store_id'</span><span class="p">]</span> |
| <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">required_keys</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">key</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">destination_dict</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s1">'Must have </span><span class="si">%s</span><span class="s1"> in the dict.'</span> <span class="o">%</span> <span class="p">(</span><span class="n">key</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span> <span class="o">=</span> <span class="n">destination_dict</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">=</span> <span class="n">input_type</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span> <span class="o">=</span> <span class="n">buffer_size</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span> <span class="o">=</span> <span class="n">max_workers</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">client</span> <span class="o">=</span> <span class="n">client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> <span class="o">=</span> <span class="n">credential</span> |
| |
| <span class="k">def</span> <span class="nf">start_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span> |
| |
| <span class="k">def</span> <span class="nf">finish_bundle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="n">item</span> |
| |
| <span class="k">def</span> <span class="nf">process</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">element</span><span class="p">,</span> |
| <span class="n">window</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">WindowParam</span><span class="p">,</span> |
| <span class="n">timestamp</span><span class="o">=</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="o">.</span><span class="n">TimestampParam</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="n">element</span><span class="p">,</span> <span class="n">window</span><span class="p">,</span> <span class="n">timestamp</span><span class="p">))</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">buffer</span><span class="p">)</span> <span class="o">>=</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer_size</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">item</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_flush</span><span class="p">():</span> |
| <span class="k">yield</span> <span class="n">item</span> |
| |
| <span class="k">def</span> <span class="nf">make_request</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">):</span> |
| <span class="c1"># Send file to DICOM store and records the results.</span> |
| <span class="n">project_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">'project_id'</span><span class="p">]</span> |
| <span class="n">region</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">'region'</span><span class="p">]</span> |
| <span class="n">dataset_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">'dataset_id'</span><span class="p">]</span> |
| <span class="n">dicom_store_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">destination_dict</span><span class="p">[</span><span class="s1">'dicom_store_id'</span><span class="p">]</span> |
| |
| <span class="c1"># Feed the dicom file into store client</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="p">:</span> |
| <span class="n">_</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">dicomweb_store_instance</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> |
| <span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">_</span><span class="p">,</span> <span class="n">status_code</span> <span class="o">=</span> <span class="n">DicomApiHttpClient</span><span class="p">()</span><span class="o">.</span><span class="n">dicomweb_store_instance</span><span class="p">(</span> |
| <span class="n">project_id</span><span class="p">,</span> <span class="n">region</span><span class="p">,</span> <span class="n">dataset_id</span><span class="p">,</span> <span class="n">dicom_store_id</span><span class="p">,</span> <span class="n">dicom_file</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">credential</span> |
| <span class="p">)</span> |
| |
| <span class="n">out</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'status'</span><span class="p">]</span> <span class="o">=</span> <span class="n">status_code</span> |
| <span class="n">out</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span><span class="n">status_code</span> <span class="o">==</span> <span class="mi">200</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">out</span> |
| |
| <span class="k">def</span> <span class="nf">read_dicom_file</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span> |
| <span class="c1"># Read the file based on different input. If the read fails ,return</span> |
| <span class="c1"># an error dict which records input and error messages.</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">input_type</span> <span class="o">==</span> <span class="s1">'fileio'</span><span class="p">:</span> |
| <span class="n">f</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="o">.</span><span class="n">open</span><span class="p">()</span> |
| <span class="n">data</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| <span class="n">f</span><span class="o">.</span><span class="n">close</span><span class="p">()</span> |
| <span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="n">data</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">True</span><span class="p">,</span> <span class="n">buffer_element</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">error_message</span><span class="p">:</span> |
| <span class="n">error_out</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">error_out</span><span class="p">[</span><span class="s1">'status'</span><span class="p">]</span> <span class="o">=</span> <span class="n">error_message</span> |
| <span class="n">error_out</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">return</span> <span class="kc">False</span><span class="p">,</span> <span class="n">error_out</span> |
| |
| <span class="k">def</span> <span class="nf">process_buffer_element</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">buffer_element</span><span class="p">):</span> |
| <span class="c1"># Thread job runner - each thread stores a DICOM file</span> |
| <span class="n">success</span><span class="p">,</span> <span class="n">read_result</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">read_dicom_file</span><span class="p">(</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="n">windows</span> <span class="o">=</span> <span class="p">[</span><span class="n">buffer_element</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> |
| <span class="n">timestamp</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span> |
| <span class="n">value</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">success</span><span class="p">:</span> |
| <span class="n">value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">make_request</span><span class="p">(</span><span class="n">read_result</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">value</span> <span class="o">=</span> <span class="n">read_result</span> |
| <span class="c1"># save the undeliverable data</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">value</span><span class="p">[</span><span class="s1">'success'</span><span class="p">]:</span> |
| <span class="n">value</span><span class="p">[</span><span class="s1">'input'</span><span class="p">]</span> <span class="o">=</span> <span class="n">buffer_element</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">beam</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">windowed_value</span><span class="o">.</span><span class="n">WindowedValue</span><span class="p">(</span> |
| <span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> <span class="n">timestamp</span><span class="o">=</span><span class="n">timestamp</span><span class="p">,</span> <span class="n">windows</span><span class="o">=</span><span class="n">windows</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">_flush</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># Create thread pool executor and process the buffered elements in paralllel</span> |
| <span class="n">executor</span> <span class="o">=</span> <span class="n">ThreadPoolExecutor</span><span class="p">(</span><span class="n">max_workers</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_workers</span><span class="p">)</span> |
| <span class="n">futures</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="n">executor</span><span class="o">.</span><span class="n">submit</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">process_buffer_element</span><span class="p">,</span> <span class="n">ele</span><span class="p">)</span> <span class="k">for</span> <span class="n">ele</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> |
| <span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">buffer</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">f</span> <span class="ow">in</span> <span class="n">as_completed</span><span class="p">(</span><span class="n">futures</span><span class="p">):</span> |
| <span class="k">yield</span> <span class="n">f</span><span class="o">.</span><span class="n">result</span><span class="p">()</span> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| © Copyright |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |