|  | ################################################################################ | 
|  | #  Licensed to the Apache Software Foundation (ASF) under one | 
|  | #  or more contributor license agreements.  See the NOTICE file | 
|  | #  distributed with this work for additional information | 
|  | #  regarding copyright ownership.  The ASF licenses this file | 
|  | #  to you under the Apache License, Version 2.0 (the | 
|  | #  "License"); you may not use this file except in compliance | 
|  | #  with the License.  You may obtain a copy of the License at | 
|  | # | 
|  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #  Unless required by applicable law or agreed to in writing, software | 
|  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
|  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | #  See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | ################################################################################ | 
|  |  | 
|  | from pyflink.java_gateway import get_gateway | 
|  | from pyflink.table.types import _to_java_type | 
|  | from pyflink.util import java_utils | 
|  |  | 
|  | __all__ = ['TableSink', 'CsvTableSink', 'WriteMode'] | 
|  |  | 
|  |  | 
|  | class TableSink(object): | 
|  | """ | 
|  | A :class:`TableSink` specifies how to emit a table to an external system or location. | 
|  | """ | 
|  |  | 
|  | def __init__(self, j_table_sink): | 
|  | self._j_table_sink = j_table_sink | 
|  |  | 
|  |  | 
|  | class WriteMode(object): | 
|  | NO_OVERWRITE = 0 | 
|  | OVERWRITE = 1 | 
|  |  | 
|  |  | 
|  | class CsvTableSink(TableSink): | 
|  | """ | 
|  | A simple :class:`TableSink` to emit data as CSV files. | 
|  |  | 
|  | Example: | 
|  | :: | 
|  |  | 
|  | >>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()], | 
|  | ...              "/csv/file/path", "|", 1, WriteMode.OVERWRITE) | 
|  |  | 
|  | :param field_names: The list of field names. | 
|  | :param field_types: The list of field data types. | 
|  | :param path: The output path to write the Table to. | 
|  | :param field_delimiter: The field delimiter. | 
|  | :param num_files: The number of files to write to. | 
|  | :param write_mode: The write mode to specify whether existing files are overwritten or not, | 
|  | which contains: :data:`WriteMode.NO_OVERWRITE` | 
|  | and :data:`WriteMode.OVERWRITE`. | 
|  | """ | 
|  |  | 
|  | def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1, | 
|  | write_mode=None): | 
|  | gateway = get_gateway() | 
|  | if write_mode == WriteMode.NO_OVERWRITE: | 
|  | j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE | 
|  | elif write_mode == WriteMode.OVERWRITE: | 
|  | j_write_mode = gateway.jvm.org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE | 
|  | elif write_mode is None: | 
|  | j_write_mode = None | 
|  | else: | 
|  | raise Exception('Unsupported write_mode: %s' % write_mode) | 
|  | j_csv_table_sink = gateway.jvm.CsvTableSink( | 
|  | path, field_delimiter, num_files, j_write_mode) | 
|  | j_field_names = java_utils.to_jarray(gateway.jvm.String, field_names) | 
|  | j_field_types = java_utils.to_jarray( | 
|  | gateway.jvm.TypeInformation, | 
|  | [_to_java_type(field_type) for field_type in field_types]) | 
|  | j_csv_table_sink = j_csv_table_sink.configure(j_field_names, j_field_types) | 
|  | super(CsvTableSink, self).__init__(j_csv_table_sink) |