blob: bd598de2f80a9baa3d6ccd162508cbeb94ef90e1 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sqlite3
import queue
import threading
class Db(object):
def __init__(self, db, injector):
self.db = db
self.injector = injector
self.tasks = queue.Queue()
self.position = None
self.pending_events = []
self.running = True
self.thread = threading.Thread(target=self._process)
self.thread.daemon = True
self.thread.start()
def close(self):
self.tasks.put(lambda conn: self._close())
def reset(self):
self.tasks.put(lambda conn: self._reset())
def load(self, records, event=None):
self.tasks.put(lambda conn: self._load(conn, records, event))
def get_id(self, event):
self.tasks.put(lambda conn: self._get_id(conn, event))
def insert(self, id, data, event=None):
self.tasks.put(lambda conn: self._insert(conn, id, data, event))
def delete(self, id, event=None):
self.tasks.put(lambda conn: self._delete(conn, id, event))
def _reset(self, ignored=None):
self.position = None
def _close(self, ignored=None):
self.running = False
def _get_id(self, conn, event):
cursor = conn.execute("SELECT * FROM records ORDER BY id DESC")
row = cursor.fetchone()
if event:
if row:
event.id = row['id']
else:
event.id = 0
self.injector.trigger(event)
def _load(self, conn, records, event):
if self.position:
cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
else:
cursor = conn.execute("SELECT * FROM records ORDER BY id")
while not records.full():
row = cursor.fetchone()
if row:
self.position = row['id']
records.put(dict(row))
else:
break
if event:
self.injector.trigger(event)
def _insert(self, conn, id, data, event):
if id:
conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
else:
conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
if event:
self.pending_events.append(event)
def _delete(self, conn, id, event):
conn.execute("DELETE FROM records WHERE id=?", (id,))
if event:
self.pending_events.append(event)
def _process(self):
conn = sqlite3.connect(self.db)
conn.row_factory = sqlite3.Row
with conn:
while self.running:
f = self.tasks.get(True)
try:
while True:
f(conn)
f = self.tasks.get(False)
except queue.Empty:
pass
conn.commit()
for event in self.pending_events:
self.injector.trigger(event)
self.pending_events = []
self.injector.close()