| # -*- coding: utf-8 -*- |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| ''' |
| Unit tests for cmislib |
| ''' |
| import unittest |
| from unittest import TestSuite, TestLoader |
| from cmislib.model import CmisClient, ACE |
| from cmislib.exceptions import \ |
| ObjectNotFoundException, \ |
| PermissionDeniedException, \ |
| CmisException, \ |
| NotSupportedException |
| from cmislib import messages |
| import os |
| from time import sleep, time |
| import settings |
| |
| ## Fix test file paths in case test is launched using nosetests |
| my_dir = os.path.dirname(os.path.abspath(__file__)) |
| try: |
| os.stat(settings.TEST_BINARY_1) |
| except: |
| settings.TEST_BINARY_1 = os.path.join(my_dir, settings.TEST_BINARY_1) |
| try: |
| os.stat(settings.TEST_BINARY_2) |
| except: |
| settings.TEST_BINARY_2 = os.path.join(my_dir, settings.TEST_BINARY_2) |
| |
| |
| class CmisTestBase(unittest.TestCase): |
| |
| """ Common ancestor class for most cmislib unit test classes. """ |
| |
| def setUp(self): |
| """ Create a root test folder for the test. """ |
| self._cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| self._repo = self._cmisClient.getDefaultRepository() |
| self._rootFolder = self._repo.getObjectByPath(settings.TEST_ROOT_PATH) |
| self._folderName = " ".join(['cmislib', self.__class__.__name__, str(time())]) |
| self._testFolder = self._rootFolder.createFolder(self._folderName) |
| |
| def tearDown(self): |
| """ Clean up after the test. """ |
| self._testFolder.deleteTree() |
| |
| |
| class CmisClientTest(unittest.TestCase): |
| |
| """ Tests for the :class:`CmisClient` class. """ |
| |
| def testCmisClient(self): |
| '''Instantiate a CmisClient object''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| self.assert_(cmisClient != None) |
| |
| def testGetRepositories(self): |
| '''Call getRepositories and make sure at least one comes back with |
| an ID and a name |
| ''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repoInfo = cmisClient.getRepositories() |
| self.assert_(len(repoInfo) >= 1) |
| self.assert_('repositoryId' in repoInfo[0]) |
| self.assert_('repositoryName' in repoInfo[0]) |
| |
| def testDefaultRepository(self): |
| '''Get the default repository by calling the repo's service URL''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| self.assert_(repo != None) |
| self.assert_(repo.getRepositoryId() != None) |
| |
| def testGetRepository(self): |
| '''Get a repository by repository ID''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| defaultRepoId = repo.getRepositoryId() |
| defaultRepoName = repo.getRepositoryName() |
| repo = cmisClient.getRepository(defaultRepoId) |
| self.assertEquals(defaultRepoId, repo.getRepositoryId()) |
| self.assertEquals(defaultRepoName, repo.getRepositoryName()) |
| |
| # Error conditions |
| def testCmisClientBadUrl(self): |
| '''Try to instantiate a CmisClient object with a known bad URL''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL + 'foobar', settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| self.assertRaises(CmisException, cmisClient.getRepositories) |
| |
| def testCmisClientBadAuth(self): |
| '''Try to instantiate a CmisClient object with bad creds''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, 'BADPASS') |
| self.assertRaises(PermissionDeniedException, |
| cmisClient.getRepositories) |
| |
| def testGetRepositoryBadId(self): |
| '''Try to get a repository with a bad repo ID''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| self.assertRaises(ObjectNotFoundException, |
| cmisClient.getRepository, |
| '123FOO') |
| |
| |
| class QueryTest(CmisTestBase): |
| |
| """ Tests related to running CMIS queries. """ |
| |
| # TODO: Test the rest of these queries |
| # queryDateRange = "SELECT cmis:name from cmis:document " \ |
| # "where cmis:creationDate >= TIMESTAMP'2009-11-10T00:00:00.000-06:00' and " \ |
| # "cmis:creationDate < TIMESTAMP'2009-11-18T00:00:00.000-06:00'" |
| # queryFolderFullText = "SELECT cmis:name from cmis:document " \ |
| # "where in_folder('workspace://SpacesStore/3935ce21-9f6f-4d46-9e22-4f97e1d5d9d8') " \ |
| # "and contains('contract')" |
| # queryCombined = "SELECT cmis:name from cmis:document " \ |
| # "where in_tree('workspace://SpacesStore/3935ce21-9f6f-4d46-9e22-4f97e1d5d9d8') and " \ |
| # "contains('contract') and cm:description like \"%sign%\"" |
| |
| def setUp(self): |
| """ |
| Override the base setUp to include creating a couple |
| of test docs. |
| """ |
| CmisTestBase.setUp(self) |
| # I think this may be an Alfresco bug. The CMIS query results contain |
| # 1 less entry element than the number of search results. So this test |
| # will create two documents and search for the second one which should |
| # work in all repositories. |
| testFile = open(settings.TEST_BINARY_2, 'rb') |
| self._testContent = self._testFolder.createDocument(testFile.name, contentFile=testFile) |
| testFile.close() |
| testFile = open(settings.TEST_BINARY_2, 'rb') |
| self._testContent2 = self._testFolder.createDocument(settings.TEST_BINARY_2.replace('.', '2.'), contentFile=testFile) |
| testFile.close() |
| self._maxFullTextTries = settings.MAX_FULL_TEXT_TRIES |
| |
| def testSimpleSelect(self): |
| '''Execute simple select star from cmis:document''' |
| querySimpleSelect = "SELECT * FROM cmis:document" |
| resultSet = self._repo.query(querySimpleSelect) |
| self.assertTrue(isInResultSet(resultSet, self._testContent)) |
| |
| def testWildcardPropertyMatch(self): |
| '''Find content w/wildcard match on cmis:name property''' |
| name = self._testContent.getProperties()['cmis:name'] |
| querySimpleSelect = "SELECT * FROM cmis:document where cmis:name like '" + name[:7] + "%'" |
| resultSet = self._repo.query(querySimpleSelect) |
| self.assertTrue(isInResultSet(resultSet, self._testContent)) |
| |
| def testPropertyMatch(self): |
| '''Find content matching cmis:name property''' |
| name = self._testContent2.getProperties()['cmis:name'] |
| querySimpleSelect = "SELECT * FROM cmis:document where cmis:name = '" + name + "'" |
| resultSet = self._repo.query(querySimpleSelect) |
| self.assertTrue(isInResultSet(resultSet, self._testContent2)) |
| |
| def testFullText(self): |
| '''Find content using a full-text query''' |
| queryFullText = "SELECT cmis:objectId, cmis:name FROM cmis:document " \ |
| "WHERE contains('whitepaper')" |
| # on the first full text search the indexer may need a chance to |
| # do its thing |
| found = False |
| maxTries = self._maxFullTextTries |
| while not found and (maxTries > 0): |
| resultSet = self._repo.query(queryFullText) |
| found = isInResultSet(resultSet, self._testContent2) |
| if not found: |
| maxTries -= 1 |
| print 'Not found...sleeping for 10 secs. Remaining tries:%d' % maxTries |
| sleep(settings.FULL_TEXT_WAIT) |
| self.assertTrue(found) |
| |
| def testScore(self): |
| '''Find content using FT, sorted by relevance score''' |
| queryScore = "SELECT cmis:objectId, cmis:name, Score() as relevance " \ |
| "FROM cmis:document WHERE contains('sample') " \ |
| "order by relevance DESC" |
| |
| # on the first full text search the indexer may need a chance to |
| # do its thing |
| found = False |
| maxTries = self._maxFullTextTries |
| while not found and (maxTries > 0): |
| resultSet = self._repo.query(queryScore) |
| found = isInResultSet(resultSet, self._testContent2) |
| if not found: |
| maxTries -= 1 |
| print 'Not found...sleeping for 10 secs. Remaining tries:%d' % maxTries |
| sleep(10) |
| self.assertTrue(found) |
| |
| |
| class RepositoryTest(CmisTestBase): |
| |
| """ Tests for the :class:`Repository` class. """ |
| |
| def testRepositoryInfo(self): |
| '''Retrieve repository info''' |
| repoInfo = self._repo.getRepositoryInfo() |
| self.assertTrue('repositoryId' in repoInfo) |
| self.assertTrue('repositoryName' in repoInfo) |
| self.assertTrue('repositoryDescription' in repoInfo) |
| self.assertTrue('vendorName' in repoInfo) |
| self.assertTrue('productName' in repoInfo) |
| self.assertTrue('productVersion' in repoInfo) |
| self.assertTrue('rootFolderId' in repoInfo) |
| self.assertTrue('cmisVersionSupported' in repoInfo) |
| |
| def testRepositoryCapabilities(self): |
| '''Retrieve repository capabilities''' |
| caps = self._repo.getCapabilities() |
| self.assertTrue('ACL' in caps) |
| self.assertTrue('AllVersionsSearchable' in caps) |
| self.assertTrue('Changes' in caps) |
| self.assertTrue('ContentStreamUpdatability' in caps) |
| self.assertTrue('GetDescendants' in caps) |
| self.assertTrue('GetFolderTree' in caps) |
| self.assertTrue('Multifiling' in caps) |
| self.assertTrue('PWCSearchable' in caps) |
| self.assertTrue('PWCUpdatable' in caps) |
| self.assertTrue('Query' in caps) |
| self.assertTrue('Renditions' in caps) |
| self.assertTrue('Unfiling' in caps) |
| self.assertTrue('VersionSpecificFiling' in caps) |
| self.assertTrue('Join' in caps) |
| |
| def testGetRootFolder(self): |
| '''Get the root folder of the repository''' |
| rootFolder = self._repo.getRootFolder() |
| self.assert_(rootFolder != None) |
| self.assert_(rootFolder.getObjectId() != None) |
| |
| def testCreateFolder(self): |
| '''Create a new folder in the root folder''' |
| folderName = 'testCreateFolder folder' |
| newFolder = self._repo.createFolder(self._rootFolder, folderName) |
| self.assertEquals(folderName, newFolder.getName()) |
| newFolder.delete() |
| |
| def testCreateDocument(self): |
| '''Create a new 'content-less' document''' |
| documentName = 'testDocument' |
| newDoc = self._repo.createDocument(documentName, parentFolder=self._testFolder) |
| self.assertEquals(documentName, newDoc.getName()) |
| |
| def testCreateDocumentFromString(self): |
| '''Create a new document from a string''' |
| documentName = 'testDocument' |
| contentString = 'Test content string' |
| newDoc = self._repo.createDocumentFromString(documentName, |
| parentFolder=self._testFolder, |
| contentString=contentString, |
| contentType='text/plain') |
| self.assertEquals(documentName, newDoc.getName()) |
| self.assertEquals(newDoc.getContentStream().read(), contentString) |
| |
| # CMIS-279 |
| def testCreateDocumentUnicode(self): |
| '''Create a new doc with unicode characters in the name''' |
| documentName = u'abc cdeöäüß%§-_caféè.txt' |
| newDoc = self._repo.createDocument(documentName, parentFolder=self._testFolder) |
| self.assertEquals(documentName, newDoc.getName()) |
| |
| def testGetObject(self): |
| '''Create a test folder then attempt to retrieve it as a |
| :class:`CmisObject` object using its object ID''' |
| folderName = 'testGetObject folder' |
| newFolder = self._repo.createFolder(self._testFolder, folderName) |
| objectId = newFolder.getObjectId() |
| someObject = self._repo.getObject(objectId) |
| self.assertEquals(folderName, someObject.getName()) |
| newFolder.delete() |
| |
| def testReturnVersion(self): |
| '''Get latest and latestmajor versions of an object''' |
| f = open(settings.TEST_BINARY_1, 'rb') |
| doc10 = self._testFolder.createDocument(settings.TEST_BINARY_1, contentFile=f) |
| doc10Id = doc10.getObjectId() |
| if (not doc10.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc10.checkout() |
| doc11 = pwc.checkin(major='false') # checkin a minor version, 1.1 |
| if (not doc11.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc11.checkout() |
| doc20 = pwc.checkin() # checkin a major version, 2.0 |
| doc20Id = doc20.getObjectId() |
| if (not doc20.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc20.checkout() |
| doc21 = pwc.checkin(major='false') # checkin a minor version, 2.1 |
| doc21Id = doc21.getObjectId() |
| |
| docLatest = self._repo.getObject(doc10Id, returnVersion='latest') |
| self.assertEquals(doc21Id, docLatest.getObjectId()) |
| |
| docLatestMajor = self._repo.getObject(doc10Id, returnVersion='latestmajor') |
| self.assertEquals(doc20Id, docLatestMajor.getObjectId()) |
| |
| def testGetFolder(self): |
| '''Create a test folder then attempt to retrieve the Folder object |
| using its object ID''' |
| folderName = 'testGetFolder folder' |
| newFolder = self._repo.createFolder(self._testFolder, folderName) |
| objectId = newFolder.getObjectId() |
| someFolder = self._repo.getFolder(objectId) |
| self.assertEquals(folderName, someFolder.getName()) |
| newFolder.delete() |
| |
| def testGetObjectByPath(self): |
| '''Create test objects (one folder, one document) then try to get |
| them by path''' |
| # names of folders and test docs |
| testFolderName = self._testFolder.getName() |
| parentFolderName = 'testGetObjectByPath folder' |
| subFolderName = 'subfolder' |
| docName = 'testdoc' |
| |
| # create the folder structure |
| parentFolder = self._testFolder.createFolder(parentFolderName) |
| subFolder = parentFolder.createFolder(subFolderName) |
| # use the subfolder path to get the folder by path |
| subFolderPath = subFolder.getProperties().get("cmis:path") |
| searchFolder = self._repo.getObjectByPath(subFolderPath) |
| self.assertEquals(subFolder.getObjectId(), searchFolder.getObjectId()) |
| |
| # create a test doc |
| doc = subFolder.createDocument(docName) |
| # ask the doc for its paths |
| searchDocPaths = doc.getPaths() |
| # for each path in the list, try to get the object by path |
| # this is better than building a path with the doc's name b/c the name |
| # isn't guaranteed to be used as the path segment (see CMIS-232) |
| for path in searchDocPaths: |
| searchDoc = self._repo.getObjectByPath(path) |
| self.assertEquals(doc.getObjectId(), searchDoc.getObjectId()) |
| |
| # get the subfolder by path, then ask for its children |
| subFolder = self._repo.getObjectByPath(subFolderPath) |
| self.assertEquals(len(subFolder.getChildren().getResults()), 1) |
| |
| def testGetUnfiledDocs(self): |
| '''Tests the repository's unfiled collection''' |
| |
| if not self._repo.getCapabilities()['Unfiling']: |
| print 'Repo does not support unfiling, skipping' |
| return |
| |
| # create a test folder and test doc |
| testFolder = self._testFolder.createFolder('unfile test') |
| newDoc = testFolder.createDocument('testdoc') |
| |
| # make sure the new doc isn't in the unfiled collection |
| try: |
| rs = self._repo.getUnfiledDocs() |
| self.assertFalse(isInResultSet(rs, newDoc)) |
| except NotSupportedException: |
| print 'This repository does not support read access to the unfiled collection...skipping' |
| return |
| |
| # delete the test folder and tell it to unfile the testdoc |
| objId = newDoc.getObjectId() |
| testFolder.deleteTree(unfileObjects='unfile') |
| |
| # grab the document by object ID |
| newDoc = self._repo.getObject(objId) |
| |
| # the doc should now be in the unfiled collection |
| self.assertTrue(isInResultSet(self._repo.getUnfiledDocs(), newDoc)) |
| self.assertEquals('testdoc', newDoc.getTitle()) |
| |
| # def testCreateUnfiledDocument(self): |
| # '''Create a new unfiled document''' |
| # if self._repo.getCapabilities()['Unfiling'] != True: |
| # print 'Repo does not support unfiling, skipping' |
| # return |
| # documentName = 'testDocument' |
| # newDoc = self._repo.createDocument(documentName) |
| # self.assertEquals(documentName, newDoc.getName()) |
| |
| def testMoveDocument(self): |
| '''Move a Document from one folder to another folder''' |
| subFolder1 = self._testFolder.createFolder('sub1') |
| doc = subFolder1.createDocument('testdoc1') |
| self.assertEquals(len(subFolder1.getChildren()), 1) |
| subFolder2 = self._testFolder.createFolder('sub2') |
| self.assertEquals(len(subFolder2.getChildren()), 0) |
| doc.move(subFolder1, subFolder2) |
| self.assertEquals(len(subFolder1.getChildren()), 0) |
| self.assertEquals(len(subFolder2.getChildren()), 1) |
| self.assertEquals(doc.name, subFolder2.getChildren()[0].name) |
| |
| #Exceptions |
| |
| def testGetObjectBadId(self): |
| '''Attempt to get an object using a known bad ID''' |
| # this object ID is implementation specific (Alfresco) but is universally |
| # bad so it should work for all repositories |
| self.assertRaises(ObjectNotFoundException, |
| self._repo.getObject, |
| self._testFolder.getObjectId()[:-5] + 'BADID') |
| |
| def testGetObjectBadPath(self): |
| '''Attempt to get an object using a known bad path''' |
| self.assertRaises(ObjectNotFoundException, |
| self._repo.getObjectByPath, |
| '/123foo/BAR.jtp') |
| |
| |
| class FolderTest(CmisTestBase): |
| |
| """ Tests for the :class:`Folder` class """ |
| |
| def testGetChildren(self): |
| '''Get the children of the test folder''' |
| childFolderName1 = 'testchild1' |
| childFolderName2 = 'testchild2' |
| grandChildFolderName = 'testgrandchild' |
| childFolder1 = self._testFolder.createFolder(childFolderName1) |
| childFolder2 = self._testFolder.createFolder(childFolderName2) |
| grandChild = childFolder2.createFolder(grandChildFolderName) |
| resultSet = self._testFolder.getChildren() |
| self.assert_(resultSet != None) |
| self.assertEquals(2, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertFalse(isInResultSet(resultSet, grandChild)) |
| |
| def testGetDescendants(self): |
| '''Get the descendants of the root folder''' |
| childFolderName1 = 'testchild1' |
| childFolderName2 = 'testchild2' |
| grandChildFolderName1 = 'testgrandchild' |
| childFolder1 = self._testFolder.createFolder(childFolderName1) |
| childFolder2 = self._testFolder.createFolder(childFolderName2) |
| grandChild = childFolder1.createFolder(grandChildFolderName1) |
| |
| # test getting descendants with depth=1 |
| resultSet = self._testFolder.getDescendants(depth=1) |
| self.assert_(resultSet != None) |
| self.assertEquals(2, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertFalse(isInResultSet(resultSet, grandChild)) |
| |
| # test getting descendants with depth=2 |
| resultSet = self._testFolder.getDescendants(depth=2) |
| self.assert_(resultSet != None) |
| self.assertEquals(3, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertTrue(isInResultSet(resultSet, grandChild)) |
| |
| # test getting descendants with depth=-1 |
| resultSet = self._testFolder.getDescendants() # -1 is the default depth |
| self.assert_(resultSet != None) |
| self.assertEquals(3, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertTrue(isInResultSet(resultSet, grandChild)) |
| |
| def testGetTree(self): |
| '''Get the folder tree of the test folder''' |
| childFolderName1 = 'testchild1' |
| childFolderName2 = 'testchild2' |
| grandChildFolderName1 = 'testgrandchild' |
| childFolder1 = self._testFolder.createFolder(childFolderName1) |
| childFolder1.createDocument('testdoc1') |
| childFolder2 = self._testFolder.createFolder(childFolderName2) |
| childFolder2.createDocument('testdoc2') |
| grandChild = childFolder1.createFolder(grandChildFolderName1) |
| grandChild.createDocument('testdoc3') |
| |
| # test getting tree with depth=1 |
| resultSet = self._testFolder.getTree(depth=1) |
| self.assert_(resultSet != None) |
| self.assertEquals(2, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertFalse(isInResultSet(resultSet, grandChild)) |
| |
| # test getting tree with depth=2 |
| resultSet = self._testFolder.getTree(depth=2) |
| self.assert_(resultSet != None) |
| self.assertEquals(3, len(resultSet.getResults())) |
| self.assertTrue(isInResultSet(resultSet, childFolder1)) |
| self.assertTrue(isInResultSet(resultSet, childFolder2)) |
| self.assertTrue(isInResultSet(resultSet, grandChild)) |
| |
| def testDeleteEmptyFolder(self): |
| '''Create a test folder, then delete it''' |
| folderName = 'testDeleteEmptyFolder folder' |
| testFolder = self._testFolder.createFolder(folderName) |
| self.assertEquals(folderName, testFolder.getName()) |
| newFolder = testFolder.createFolder('testFolder') |
| testFolderChildren = testFolder.getChildren() |
| self.assertEquals(1, len(testFolderChildren.getResults())) |
| newFolder.delete() |
| testFolderChildren = testFolder.getChildren() |
| self.assertEquals(0, len(testFolderChildren.getResults())) |
| |
| def testDeleteNonEmptyFolder(self): |
| '''Create a test folder with something in it, then delete it''' |
| folderName = 'testDeleteNonEmptyFolder folder' |
| testFolder = self._testFolder.createFolder(folderName) |
| self.assertEquals(folderName, testFolder.getName()) |
| newFolder = testFolder.createFolder('testFolder') |
| testFolderChildren = testFolder.getChildren() |
| self.assertEquals(1, len(testFolderChildren.getResults())) |
| newFolder.createDocument('testDoc') |
| self.assertEquals(1, len(newFolder.getChildren().getResults())) |
| newFolder.deleteTree() |
| testFolderChildren = testFolder.getChildren() |
| self.assertEquals(0, len(testFolderChildren.getResults())) |
| |
| def testGetProperties(self): |
| '''Get the root folder, then get its properties''' |
| props = self._testFolder.getProperties() |
| self.assert_(props != None) |
| self.assert_('cmis:objectId' in props) |
| self.assert_(props['cmis:objectId'] != None) |
| self.assert_('cmis:objectTypeId' in props) |
| self.assert_(props['cmis:objectTypeId'] != None) |
| self.assert_('cmis:name' in props) |
| self.assert_(props['cmis:name'] != None) |
| |
| def testPropertyFilter(self): |
| '''Test the properties filter''' |
| # names of folders and test docs |
| parentFolderName = 'testGetObjectByPath folder' |
| subFolderName = 'subfolder' |
| |
| # create the folder structure |
| parentFolder = self._testFolder.createFolder(parentFolderName) |
| subFolder = parentFolder.createFolder(subFolderName) |
| subFolderPath = subFolder.getProperties().get("cmis:path") |
| |
| # Per CMIS-170, CMIS providers are not required to filter the |
| # properties returned. So these tests will check only for the presence |
| # of the properties asked for, not the absence of properties that |
| # should be filtered if the server chooses to do so. |
| |
| # test when used with getObjectByPath |
| searchFolder = self._repo.getObjectByPath(subFolderPath, |
| filter='cmis:objectId,cmis:objectTypeId,cmis:baseTypeId') |
| self.assertEquals(subFolder.getObjectId(), searchFolder.getObjectId()) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:baseTypeId')) |
| |
| # test when used with getObjectByPath + reload |
| searchFolder = self._repo.getObjectByPath(subFolderPath, |
| filter='cmis:objectId,cmis:objectTypeId,cmis:baseTypeId') |
| searchFolder.reload() |
| self.assertEquals(subFolder.getObjectId(), searchFolder.getObjectId()) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:baseTypeId')) |
| |
| # test when used with getObject |
| searchFolder = self._repo.getObject(subFolder.getObjectId(), |
| filter='cmis:objectId,cmis:objectTypeId,cmis:baseTypeId') |
| self.assertEquals(subFolder.getObjectId(), searchFolder.getObjectId()) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:baseTypeId')) |
| |
| # test when used with getObject + reload |
| searchFolder = self._repo.getObject(subFolder.getObjectId(), |
| filter='cmis:objectId,cmis:objectTypeId,cmis:baseTypeId') |
| searchFolder.reload() |
| self.assertEquals(subFolder.getObjectId(), searchFolder.getObjectId()) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:baseTypeId')) |
| |
| # test that you can do a reload with a reset filter |
| searchFolder.reload(filter='*') |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:objectTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:baseTypeId')) |
| self.assertTrue(searchFolder.getProperties().has_key('cmis:name')) |
| |
| def testUpdateProperties(self): |
| '''Create a test folder, then update its properties''' |
| folderName = 'testUpdateProperties folder' |
| newFolder = self._testFolder.createFolder(folderName) |
| self.assertEquals(folderName, newFolder.getName()) |
| folderName2 = 'testUpdateProperties folder2' |
| props = {'cmis:name': folderName2} |
| newFolder.updateProperties(props) |
| self.assertEquals(folderName2, newFolder.getName()) |
| |
| def testSubFolder(self): |
| '''Create a test folder, then create a test folder within that.''' |
| parentFolder = self._testFolder.createFolder('testSubFolder folder') |
| self.assert_('cmis:objectId' in parentFolder.getProperties()) |
| childFolder = parentFolder.createFolder('child folder') |
| self.assert_('cmis:objectId' in childFolder.getProperties()) |
| self.assert_(childFolder.getProperties()['cmis:objectId'] != None) |
| |
| def testAllowableActions(self): |
| '''Create a test folder, then get its allowable actions''' |
| actions = self._testFolder.getAllowableActions() |
| self.assert_(len(actions) > 0) |
| |
| def testGetParent(self): |
| '''Get a folder's parent using the getParent call''' |
| childFolder = self._testFolder.createFolder('parentTest') |
| parentFolder = childFolder.getParent() |
| self.assertEquals(self._testFolder.getObjectId(), parentFolder.getObjectId()) |
| |
| def testAddObject(self): |
| '''Add an existing object to another folder''' |
| if not self._repo.getCapabilities()['Multifiling']: |
| print 'This repository does not allow multifiling, skipping' |
| return |
| |
| subFolder1 = self._testFolder.createFolder('sub1') |
| doc = subFolder1.createDocument('testdoc1') |
| self.assertEquals(len(subFolder1.getChildren()), 1) |
| subFolder2 = self._testFolder.createFolder('sub2') |
| self.assertEquals(len(subFolder2.getChildren()), 0) |
| subFolder2.addObject(doc) |
| self.assertEquals(len(subFolder2.getChildren()), 1) |
| self.assertEquals(subFolder1.getChildren()[0].name, subFolder2.getChildren()[0].name) |
| |
| def testRemoveObject(self): |
| '''Remove an existing object from a secondary folder''' |
| if not self._repo.getCapabilities()['Unfiling']: |
| print 'This repository does not allow unfiling, skipping' |
| return |
| |
| subFolder1 = self._testFolder.createFolder('sub1') |
| doc = subFolder1.createDocument('testdoc1') |
| self.assertEquals(len(subFolder1.getChildren()), 1) |
| subFolder2 = self._testFolder.createFolder('sub2') |
| self.assertEquals(len(subFolder2.getChildren()), 0) |
| subFolder2.addObject(doc) |
| self.assertEquals(len(subFolder2.getChildren()), 1) |
| self.assertEquals(subFolder1.getChildren()[0].name, subFolder2.getChildren()[0].name) |
| subFolder2.removeObject(doc) |
| self.assertEquals(len(subFolder2.getChildren()), 0) |
| self.assertEquals(len(subFolder1.getChildren()), 1) |
| self.assertEquals(doc.name, subFolder1.getChildren()[0].name) |
| |
| def testFolderLeadingDot(self): |
| '''Create a folder with a leading dot in its name''' |
| leadingDotFolder = self._testFolder.createFolder('.leadingDot') |
| resultSet = self._testFolder.getChildren() |
| self.assert_(resultSet != None) |
| self.assertTrue(leadingDotFolder.getName().startswith('.')) |
| |
| def testFolderTrailingDot(self): |
| '''Create a folder with a trailing dot in its name''' |
| trailingDotFolder = self._testFolder.createFolder('trailingDot.') |
| resultSet = self._testFolder.getChildren() |
| self.assert_(resultSet != None) |
| self.assertTrue(trailingDotFolder.getName().endswith('.')) |
| |
| def testGetPaths(self): |
| '''Get a folder's paths''' |
| # ask the root for its path |
| root = self._repo.getRootFolder() |
| paths = root.getPaths() |
| self.assertTrue(len(paths) == 1) |
| self.assertTrue(paths[0] == '/') |
| # ask the test folder for its paths |
| paths = self._testFolder.getPaths() |
| self.assertTrue(len(paths) == 1) |
| |
| # Exceptions |
| |
| def testBadParentFolder(self): |
| '''Try to create a folder on a bad/bogus/deleted parent |
| folder object''' |
| firstFolder = self._testFolder.createFolder('testBadParentFolder folder') |
| self.assert_('cmis:objectId' in firstFolder.getProperties()) |
| firstFolder.delete() |
| # folder isn't in the repo anymore, but I still have the object |
| # really, this seems like it ought to be an ObjectNotFoundException but |
| # not all CMIS providers report it as such |
| self.assertRaises(CmisException, |
| firstFolder.createFolder, |
| 'bad parent') |
| |
| # Per CMIS-169, nothing in the spec says that an exception should be thrown |
| # when a duplicate folder is created, so this test is really not necessary. |
| # def testDuplicateFolder(self): |
| # '''Try to create a folder that already exists''' |
| # folderName = 'testDupFolder folder' |
| # firstFolder = self._testFolder.createFolder(folderName) |
| # self.assert_('cmis:objectId' in firstFolder.getProperties()) |
| # # really, this needs to be ContentAlreadyExistsException but |
| # # not all CMIS providers report it as such |
| # self.assertRaises(CmisException, |
| # self._testFolder.createFolder, |
| # folderName) |
| |
| |
| class ChangeEntryTest(CmisTestBase): |
| |
| """ Tests for the :class:`ChangeEntry` class """ |
| |
| def testGetContentChanges(self): |
| |
| """Get the content changes and inspect Change Entry props""" |
| |
| # need to check changes capability |
| if not self._repo.capabilities['Changes']: |
| print messages.NO_CHANGE_LOG_SUPPORT |
| return |
| |
| # at least one change should have been made due to the creation of the |
| # test documents |
| rs = self._repo.getContentChanges() |
| self.assertTrue(len(rs) > 0) |
| changeEntry = rs[0] |
| self.assertTrue(changeEntry.id) |
| self.assertTrue(changeEntry.changeType in ['created', 'updated', 'deleted']) |
| self.assertTrue(changeEntry.changeTime) |
| |
| def testGetACL(self): |
| |
| """Gets the ACL that is included with a Change Entry.""" |
| |
| # need to check changes capability |
| if not self._repo.capabilities['Changes']: |
| print messages.NO_CHANGE_LOG_SUPPORT |
| return |
| |
| # need to check ACL capability |
| if not self._repo.capabilities['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| |
| # need to test once with includeACL set to true |
| rs = self._repo.getContentChanges(includeACL='true') |
| self.assertTrue(len(rs) > 0) |
| changeEntry = rs[0] |
| acl = changeEntry.getACL() |
| self.assertTrue(acl) |
| for entry in acl.getEntries().values(): |
| self.assertTrue(entry.principalId) |
| self.assertTrue(entry.permissions) |
| |
| # need to test once without includeACL set |
| rs = self._repo.getContentChanges() |
| self.assertTrue(len(rs) > 0) |
| changeEntry = rs[0] |
| acl = changeEntry.getACL() |
| self.assertTrue(acl) |
| for entry in acl.getEntries().values(): |
| self.assertTrue(entry.principalId) |
| self.assertTrue(entry.permissions) |
| |
| def testGetProperties(self): |
| |
| """Gets the properties of an object included with a Change Entry.""" |
| |
| # need to check changes capability |
| changeCap = self._repo.capabilities['Changes'] |
| if not changeCap: |
| print messages.NO_CHANGE_LOG_SUPPORT |
| return |
| |
| # need to test once without includeProperties set. the objectID should be there |
| rs = self._repo.getContentChanges() |
| self.assertTrue(len(rs) > 0) |
| changeEntry = rs[0] |
| self.assertTrue(changeEntry.properties['cmis:objectId']) |
| |
| # need to test once with includeProperties set. the objectID should be there plus object props |
| if changeCap in ['properties', 'all']: |
| rs = self._repo.getContentChanges(includeProperties='true') |
| self.assertTrue(len(rs) > 0) |
| changeEntry = rs[0] |
| self.assertTrue(changeEntry.properties['cmis:objectId']) |
| self.assertTrue(changeEntry.properties['cmis:name']) |
| |
| |
| class DocumentTest(CmisTestBase): |
| |
| """ Tests for the :class:`Document` class """ |
| |
| def testCheckout(self): |
| '''Create a document in a test folder, then check it out''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| if (not newDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwcDoc = newDoc.checkout() |
| try: |
| self.assertTrue(newDoc.isCheckedOut()) |
| self.assert_('cmis:objectId' in newDoc.getProperties()) |
| self.assert_('cmis:objectId' in pwcDoc.getProperties()) |
| checkedOutDocs = self._repo.getCollection('checkedout') |
| self.assertTrue(isInResultSet(checkedOutDocs, pwcDoc)) |
| finally: |
| pwcDoc.delete() |
| |
| def testCheckin(self): |
| '''Create a document in a test folder, check it out, then in''' |
| testFilename = settings.TEST_BINARY_1 |
| contentFile = open(testFilename, 'rb') |
| testDoc = self._testFolder.createDocument(testFilename, contentFile=contentFile) |
| contentFile.close() |
| self.assertEquals(testFilename, testDoc.getName()) |
| if (not testDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwcDoc = testDoc.checkout() |
| |
| try: |
| self.assertTrue(testDoc.isCheckedOut()) |
| self.assert_('cmis:objectId' in testDoc.getProperties()) |
| self.assert_('cmis:objectId' in pwcDoc.getProperties()) |
| testDoc = pwcDoc.checkin() |
| self.assertFalse(testDoc.isCheckedOut()) |
| finally: |
| if testDoc.isCheckedOut(): |
| pwcDoc.delete() |
| |
| def testCheckinComment(self): |
| '''Checkin a document with a comment''' |
| testFilename = settings.TEST_BINARY_1 |
| contentFile = open(testFilename, 'rb') |
| testDoc = self._testFolder.createDocument(testFilename, contentFile=contentFile) |
| contentFile.close() |
| self.assertEquals(testFilename, testDoc.getName()) |
| if (not testDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwcDoc = testDoc.checkout() |
| |
| try: |
| self.assertTrue(testDoc.isCheckedOut()) |
| testDoc = pwcDoc.checkin(checkinComment='Just a few changes') |
| self.assertFalse(testDoc.isCheckedOut()) |
| self.assertEquals('Just a few changes', |
| testDoc.getProperties()['cmis:checkinComment']) |
| finally: |
| if testDoc.isCheckedOut(): |
| pwcDoc.delete() |
| |
| def testCheckinAfterGetPWC(self): |
| '''Create a document in a test folder, check it out, call getPWC, then checkin''' |
| if not self._repo.getCapabilities()['PWCUpdatable'] == True: |
| print 'Repository does not support PWCUpdatable, skipping' |
| return |
| |
| testFilename = settings.TEST_BINARY_1 |
| contentFile = open(testFilename, 'rb') |
| testDoc = self._testFolder.createDocument(testFilename, contentFile=contentFile) |
| contentFile.close() |
| self.assertEquals(testFilename, testDoc.getName()) |
| # Alfresco has a bug where if you get the PWC this way |
| # the checkin will not be successful |
| if (not testDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| testDoc.checkout() |
| pwcDoc = testDoc.getPrivateWorkingCopy() |
| try: |
| self.assertTrue(testDoc.isCheckedOut()) |
| self.assert_('cmis:objectId' in testDoc.getProperties()) |
| self.assert_('cmis:objectId' in pwcDoc.getProperties()) |
| testDoc = pwcDoc.checkin() |
| self.assertFalse(testDoc.isCheckedOut()) |
| finally: |
| if testDoc.isCheckedOut(): |
| pwcDoc.delete() |
| |
| def testCancelCheckout(self): |
| '''Create a document in a test folder, check it out, then cancel |
| checkout''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| if (not newDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwcDoc = newDoc.checkout() |
| try: |
| self.assertTrue(newDoc.isCheckedOut()) |
| self.assert_('cmis:objectId' in newDoc.getProperties()) |
| self.assert_('cmis:objectId' in pwcDoc.getProperties()) |
| checkedOutDocs = self._repo.getCollection('checkedout') |
| self.assertTrue(isInResultSet(checkedOutDocs, pwcDoc)) |
| finally: |
| pwcDoc.delete() |
| self.assertFalse(newDoc.isCheckedOut()) |
| checkedOutDocs = self._repo.getCollection('checkedout') |
| self.assertFalse(isInResultSet(checkedOutDocs, pwcDoc)) |
| |
| def testDeleteDocument(self): |
| '''Create a document in a test folder, then delete it''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| children = self._testFolder.getChildren() |
| self.assertEquals(1, len(children.getResults())) |
| newDoc.delete() |
| children = self._testFolder.getChildren() |
| self.assertEquals(0, len(children.getResults())) |
| |
| def testGetLatestVersion(self): |
| '''Get latest version of an object''' |
| f = open(settings.TEST_BINARY_1, 'rb') |
| doc10 = self._testFolder.createDocument(settings.TEST_BINARY_1, contentFile=f) |
| if (not doc10.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc10.checkout() |
| doc11 = pwc.checkin(major='false') # checkin a minor version, 1.1 |
| if (not doc11.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc11.checkout() |
| doc20 = pwc.checkin() # checkin a major version, 2.0 |
| doc20Id = doc20.getObjectId() |
| if (not doc20.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc20.checkout() |
| doc21 = pwc.checkin(major='false') # checkin a minor version, 2.1 |
| doc21Id = doc21.getObjectId() |
| |
| docLatest = doc10.getLatestVersion() |
| self.assertEquals(doc21Id, docLatest.getObjectId()) |
| |
| docLatestMajor = doc10.getLatestVersion(major='true') |
| self.assertEquals(doc20Id, docLatestMajor.getObjectId()) |
| |
| def testGetPropertiesOfLatestVersion(self): |
| '''Get properties of latest version of an object''' |
| f = open(settings.TEST_BINARY_1, 'rb') |
| doc10 = self._testFolder.createDocument(settings.TEST_BINARY_1, contentFile=f) |
| if (not doc10.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc10.checkout() |
| doc11 = pwc.checkin(major='false') # checkin a minor version, 1.1 |
| if (not doc11.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc11.checkout() |
| doc20 = pwc.checkin() # checkin a major version, 2.0 |
| # what comes back from a checkin may not include all props, so reload |
| doc20.reload() |
| doc20Label = doc20.getProperties()['cmis:versionLabel'] |
| if (not doc20.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc20.checkout() |
| doc21 = pwc.checkin(major='false') # checkin a minor version, 2.1 |
| # what comes back from a checkin may not include all props, so reload |
| doc21.reload() |
| doc21Label = doc21.getProperties()['cmis:versionLabel'] |
| |
| propsLatest = doc10.getPropertiesOfLatestVersion() |
| self.assertEquals(doc21Label, propsLatest['cmis:versionLabel']) |
| |
| propsLatestMajor = doc10.getPropertiesOfLatestVersion(major='true') |
| self.assertEquals(doc20Label, propsLatestMajor['cmis:versionLabel']) |
| |
| def testGetProperties(self): |
| '''Create a document in a test folder, then get its properties''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| self.assertEquals('testDocument', newDoc.getName()) |
| self.assertTrue('cmis:objectTypeId' in newDoc.getProperties()) |
| self.assertTrue('cmis:objectId' in newDoc.getProperties()) |
| |
| def testAllowableActions(self): |
| '''Create document in a test folder, then get its allowable actions''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| actions = newDoc.getAllowableActions() |
| self.assert_(len(actions) > 0) |
| |
| def testUpdateProperties(self): |
| '''Create a document in a test folder, then update its properties''' |
| newDoc = self._testFolder.createDocument('testDocument') |
| self.assertEquals('testDocument', newDoc.getName()) |
| props = {'cmis:name': 'testDocument2'} |
| newDoc.updateProperties(props) |
| self.assertEquals('testDocument2', newDoc.getName()) |
| |
| def testSetContentStreamPWC(self): |
| '''Set the content stream on the PWC''' |
| if self._repo.getCapabilities()['ContentStreamUpdatability'] == 'none': |
| print 'This repository does not allow content stream updates, skipping' |
| return |
| |
| testFile1 = settings.TEST_BINARY_1 |
| testFile1Size = os.path.getsize(testFile1) |
| exportFile1 = testFile1.replace('.', 'export.') |
| testFile2 = settings.TEST_BINARY_2 |
| testFile2Size = os.path.getsize(testFile2) |
| exportFile2 = testFile1.replace('.', 'export.') |
| |
| # create a test document |
| contentFile = open(testFile1, 'rb') |
| newDoc = self._testFolder.createDocument(testFile1, contentFile=contentFile) |
| contentFile.close() |
| |
| # export the test document |
| result = newDoc.getContentStream() |
| outfile = open(exportFile1, 'wb') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| |
| # the file we exported should be the same size as the file we |
| # originally created |
| self.assertEquals(testFile1Size, os.path.getsize(exportFile1)) |
| |
| # checkout the file |
| if (not newDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = newDoc.checkout() |
| |
| # update the PWC with a new file |
| f = open(testFile2, 'rb') |
| pwc.setContentStream(f) |
| f.close() |
| |
| # checkin the PWC |
| newDoc = pwc.checkin() |
| |
| # export the checked in document |
| result = newDoc.getContentStream() |
| outfile = open(exportFile2, 'wb') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| |
| # the file we exported should be the same size as the file we |
| # checked in after updating the PWC |
| self.assertEquals(testFile2Size, os.path.getsize(exportFile2)) |
| os.remove(exportFile2) |
| |
| def testSetContentStreamPWCMimeType(self): |
| '''Check the mimetype after the PWC checkin''' |
| if self._repo.getCapabilities()['ContentStreamUpdatability'] == 'none': |
| print 'This repository does not allow content stream updates, skipping' |
| return |
| |
| testFile1 = settings.TEST_BINARY_1 |
| |
| # create a test document |
| contentFile = open(testFile1, 'rb') |
| newDoc = self._testFolder.createDocument(testFile1, contentFile=contentFile) |
| origMimeType = newDoc.properties['cmis:contentStreamMimeType'] |
| contentFile.close() |
| |
| # checkout the file |
| if (not newDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = newDoc.checkout() |
| |
| # update the PWC with a new file |
| f = open(testFile1, 'rb') |
| pwc.setContentStream(f) |
| f.close() |
| |
| # checkin the PWC |
| newDoc = pwc.checkin() |
| |
| # CMIS-231 the checked in doc should have the same mime type as |
| # the original document |
| self.assertEquals(origMimeType, |
| newDoc.properties['cmis:contentStreamMimeType']) |
| |
| def testSetContentStreamDoc(self): |
| '''Set the content stream on a doc that's not checked out''' |
| if self._repo.getCapabilities()['ContentStreamUpdatability'] != 'anytime': |
| print 'This repository does not allow content stream updates on the doc, skipping' |
| return |
| |
| testFile1 = settings.TEST_BINARY_1 |
| testFile1Size = os.path.getsize(testFile1) |
| exportFile1 = testFile1.replace('.', 'export.') |
| testFile2 = settings.TEST_BINARY_2 |
| testFile2Size = os.path.getsize(testFile2) |
| exportFile2 = testFile1.replace('.', 'export.') |
| |
| # create a test document |
| contentFile = open(testFile1, 'rb') |
| newDoc = self._testFolder.createDocument(testFile1, contentFile=contentFile) |
| contentFile.close() |
| |
| # export the test document |
| result = newDoc.getContentStream() |
| outfile = open(exportFile1, 'wb') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| |
| # the file we exported should be the same size as the file we |
| # originally created |
| self.assertEquals(testFile1Size, os.path.getsize(exportFile1)) |
| |
| # update the PWC with a new file |
| f = open(testFile2, 'rb') |
| newDoc.setContentStream(f) |
| f.close() |
| |
| # export the checked in document |
| result = newDoc.getContentStream() |
| outfile = open(exportFile2, 'wb') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| |
| # the file we exported should be the same size as the file we |
| # checked in after updating the PWC |
| self.assertEquals(testFile2Size, os.path.getsize(exportFile2)) |
| os.remove(exportFile2) |
| |
| def testDeleteContentStreamPWC(self): |
| '''Delete the content stream of a PWC''' |
| if self._repo.getCapabilities()['ContentStreamUpdatability'] == 'none': |
| print 'This repository does not allow content stream updates, skipping' |
| return |
| if not self._repo.getCapabilities()['PWCUpdatable'] == True: |
| print 'Repository does not support PWCUpdatable, skipping' |
| return |
| |
| # create a test document |
| contentFile = open(settings.TEST_BINARY_1, 'rb') |
| newDoc = self._testFolder.createDocument(settings.TEST_BINARY_1, contentFile=contentFile) |
| contentFile.close() |
| if (not newDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = newDoc.checkout() |
| pwc.deleteContentStream() |
| self.assertRaises(CmisException, pwc.getContentStream) |
| pwc.delete() |
| |
| def testCreateDocumentBinary(self): |
| '''Create a binary document using a file from the file system''' |
| testFilename = settings.TEST_BINARY_1 |
| contentFile = open(testFilename, 'rb') |
| newDoc = self._testFolder.createDocument(testFilename, contentFile=contentFile) |
| contentFile.close() |
| self.assertEquals(testFilename, newDoc.getName()) |
| |
| # test to make sure the file we get back is the same length |
| # as the file we sent |
| result = newDoc.getContentStream() |
| exportFilename = testFilename.replace('.', 'export.') |
| outfile = open(exportFilename, 'wb') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| self.assertEquals(os.path.getsize(testFilename), |
| os.path.getsize(exportFilename)) |
| |
| # cleanup |
| os.remove(exportFilename) |
| |
| def testCreateDocumentFromString(self): |
| '''Create a new document from a string''' |
| documentName = 'testDocument' |
| contentString = 'Test content string' |
| newDoc = self._testFolder.createDocumentFromString(documentName, |
| contentString=contentString, contentType='text/plain') |
| self.assertEquals(documentName, newDoc.getName()) |
| self.assertEquals(newDoc.getContentStream().read(), contentString) |
| |
| def testCreateDocumentPlain(self): |
| '''Create a plain document using a file from the file system''' |
| testFilename = 'plain.txt' |
| testFile = open(testFilename, 'w') |
| testFile.write('This is a sample text file line 1.\n') |
| testFile.write('This is a sample text file line 2.\n') |
| testFile.write('This is a sample text file line 3.\n') |
| testFile.close() |
| contentFile = open(testFilename, 'r') |
| newDoc = self._testFolder.createDocument(testFilename, contentFile=contentFile) |
| contentFile.close() |
| self.assertEquals(testFilename, newDoc.getName()) |
| |
| # test to make sure the file we get back is the same length as the |
| # file we sent |
| result = newDoc.getContentStream() |
| exportFilename = testFilename.replace('txt', 'export.txt') |
| outfile = open(exportFilename, 'w') |
| outfile.write(result.read()) |
| result.close() |
| outfile.close() |
| self.assertEquals(os.path.getsize(testFilename), |
| os.path.getsize(exportFilename)) |
| |
| # export |
| os.remove(exportFilename) |
| os.remove(testFilename) |
| |
| def testGetAllVersions(self): |
| '''Get all versions of an object''' |
| testDoc = self._testFolder.createDocument('testdoc') |
| if (not testDoc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = testDoc.checkout() |
| doc = pwc.checkin() # 2.0 |
| if (not doc.allowableActions['canCheckOut']): |
| print 'The test doc cannot be checked out...skipping' |
| return |
| pwc = doc.checkout() |
| doc = pwc.checkin() # 3.0 |
| # what comes back from a checkin may not include all props, so reload |
| doc.reload() |
| self.assertEquals('3.0', doc.getProperties()['cmis:versionLabel']) |
| rs = doc.getAllVersions() |
| self.assertEquals(3, len(rs.getResults())) |
| # for count in range(0, 3): |
| # if count == 0: |
| # self.assertEquals('true', |
| # rs.getResults().values()[count].getProperties()['cmis:isLatestVersion']) |
| # else: |
| # self.assertEquals('false', |
| # rs.getResults().values()[count].getProperties()['cmis:isLatestVersion']) |
| |
| def testGetObjectParents(self): |
| '''Gets all object parents of an CmisObject''' |
| childFolder = self._testFolder.createFolder('parentTest') |
| parentFolder = childFolder.getObjectParents().getResults()[0] |
| self.assertEquals(self._testFolder.getObjectId(), parentFolder.getObjectId()) |
| |
| def testGetObjectParentsWithinRootFolder(self): |
| '''Gets all object parents of a root folder''' |
| rootFolder = self._repo.getRootFolder() |
| self.assertRaises(NotSupportedException, rootFolder.getObjectParents) |
| |
| def testGetObjectParentsMultiple(self): |
| '''Gets all parents of a multi-filed object''' |
| if not self._repo.getCapabilities()['Multifiling']: |
| print 'This repository does not allow multifiling, skipping' |
| return |
| |
| subFolder1 = self._testFolder.createFolder('sub1') |
| doc = subFolder1.createDocument('testdoc1') |
| self.assertEquals(len(subFolder1.getChildren()), 1) |
| subFolder2 = self._testFolder.createFolder('sub2') |
| self.assertEquals(len(subFolder2.getChildren()), 0) |
| subFolder2.addObject(doc) |
| self.assertEquals(len(subFolder2.getChildren()), 1) |
| self.assertEquals(subFolder1.getChildren()[0].name, subFolder2.getChildren()[0].name) |
| parentNames = ['sub1', 'sub2'] |
| for parent in doc.getObjectParents(): |
| parentNames.remove(parent.name) |
| self.assertEquals(len(parentNames), 0) |
| |
| def testGetPaths(self): |
| '''Get the paths of a document''' |
| testDoc = self._testFolder.createDocument('testdoc') |
| # ask the test doc for its paths |
| paths = testDoc.getPaths() |
| self.assertTrue(len(paths) >= 1) |
| |
| |
| class TypeTest(unittest.TestCase): |
| |
| """ |
| Tests for the :class:`ObjectType` class (and related methods in the |
| :class:`Repository` class. |
| """ |
| |
| def testTypeDescendants(self): |
| '''Get the descendant types of the repository.''' |
| |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| typeDefs = repo.getTypeDescendants() |
| folderDef = None |
| for typeDef in typeDefs: |
| if typeDef.getTypeId() == 'cmis:folder': |
| folderDef = typeDef |
| break |
| self.assertTrue(folderDef) |
| self.assertTrue(folderDef.baseId) |
| |
| def testTypeChildren(self): |
| '''Get the child types for this repository and make sure cmis:folder |
| is in the list.''' |
| |
| #This test would be more interesting if there was a standard way to |
| #deploy a custom model. Then we could look for custom types. |
| |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| typeDefs = repo.getTypeChildren() |
| folderDef = None |
| for typeDef in typeDefs: |
| if typeDef.getTypeId() == 'cmis:folder': |
| folderDef = typeDef |
| break |
| self.assertTrue(folderDef) |
| self.assertTrue(folderDef.baseId) |
| |
| def testTypeDefinition(self): |
| '''Get the cmis:document type and test a few props of the type.''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| docTypeDef = repo.getTypeDefinition('cmis:document') |
| self.assertEquals('cmis:document', docTypeDef.getTypeId()) |
| self.assertTrue(docTypeDef.baseId) |
| |
| def testTypeProperties(self): |
| '''Get the properties for a type.''' |
| cmisClient = CmisClient(settings.REPOSITORY_URL, settings.USERNAME, settings.PASSWORD, **settings.EXT_ARGS) |
| repo = cmisClient.getDefaultRepository() |
| docTypeDef = repo.getTypeDefinition('cmis:document') |
| self.assertEquals('cmis:document', docTypeDef.getTypeId()) |
| props = docTypeDef.getProperties().values() |
| self.assertTrue(len(props) > 0) |
| for prop in props: |
| if prop.queryable: |
| self.assertTrue(prop.queryName) |
| self.assertTrue(prop.propertyType) |
| |
| |
| class ACLTest(CmisTestBase): |
| |
| """ |
| Tests related to :class:`ACL` and :class:`ACE` |
| """ |
| |
| def testSupportedPermissions(self): |
| '''Test the value of supported permissions enum''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| self.assertTrue(self._repo.getSupportedPermissions() in ['basic', 'repository', 'both']) |
| |
| def testPermissionDefinitions(self): |
| '''Test the list of permission definitions''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| supportedPerms = self._repo.getPermissionDefinitions() |
| self.assertTrue(supportedPerms.has_key('cmis:write')) |
| |
| def testPermissionMap(self): |
| '''Test the permission mapping''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| permMap = self._repo.getPermissionMap() |
| self.assertTrue(permMap.has_key('canGetProperties.Object')) |
| self.assertTrue(len(permMap['canGetProperties.Object']) > 0) |
| |
| def testPropagation(self): |
| '''Test the propagation setting''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| self.assertTrue(self._repo.getPropagation() in ['objectonly', 'propagate', 'repositorydetermined']) |
| |
| def testGetObjectACL(self): |
| '''Test getting an object's ACL''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| acl = self._testFolder.getACL() |
| for entry in acl.getEntries().values(): |
| self.assertTrue(entry.principalId) |
| self.assertTrue(entry.permissions) |
| |
| def testApplyACL(self): |
| '''Test updating an object's ACL''' |
| if not self._repo.getCapabilities()['ACL']: |
| print messages.NO_ACL_SUPPORT |
| return |
| if not self._repo.getCapabilities()['ACL'] == 'manage': |
| print 'Repository does not support manage ACL' |
| return |
| if not self._repo.getSupportedPermissions() in ['both', 'basic']: |
| print 'Repository needs to support either both or basic permissions for this test' |
| return |
| acl = self._testFolder.getACL() |
| acl.addEntry(ACE(settings.TEST_PRINCIPAL_ID, 'cmis:write', 'true')) |
| acl = self._testFolder.applyACL(acl) |
| # would be good to check that the permission we get back is what we set |
| # but at least one server (Alf) appears to map the basic perm to a |
| # repository-specific perm |
| self.assertTrue(acl.getEntries().has_key(settings.TEST_PRINCIPAL_ID)) |
| |
| |
| def isInCollection(collection, targetDoc): |
| ''' |
| Util function that searches a list of objects for a matching target |
| object. |
| ''' |
| for doc in collection: |
| # hacking around a bizarre thing in Alfresco which is that when the |
| # PWC comes back it has an object ID of say 123ABC but when you look |
| # in the checked out collection the object ID of the PWC is now |
| # 123ABC;1.0. What is that ;1.0? I don't know, but object IDs are |
| # supposed to be immutable so I'm not sure what's going on there. |
| if doc.getObjectId().startswith(targetDoc.getObjectId()): |
| return True |
| return False |
| |
| |
| def isInResultSet(resultSet, targetDoc): |
| """ |
| Util function that searches a :class:`ResultSet` for a specified target |
| object. Note that this function will do a getNext on every page of the |
| result set until it finds what it is looking for or reaches the end of |
| the result set. For every item in the result set, the properties |
| are retrieved. Long story short: this could be an expensive call. |
| """ |
| done = False |
| while not done: |
| if resultSet.hasObject(targetDoc.getObjectId()): |
| return True |
| if resultSet.hasNext(): |
| resultSet.getNext() |
| else: |
| done = True |
| |
| if __name__ == "__main__": |
| #unittest.main() |
| tts = TestSuite() |
| #tts.addTests(TestLoader().loadTestsFromName('testGetObjectByPath', RepositoryTest)) |
| #unittest.TextTestRunner().run(tts) |
| #import sys; sys.exit(0) |
| |
| tts.addTests(TestLoader().loadTestsFromTestCase(CmisClientTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(RepositoryTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(FolderTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(DocumentTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(TypeTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(ACLTest)) |
| tts.addTests(TestLoader().loadTestsFromTestCase(ChangeEntryTest)) |
| |
| # tts.addTests(TestLoader().loadTestsFromName('testCreateDocumentFromString', RepositoryTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testCreateDocumentFromString', DocumentTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testMoveDocument', RepositoryTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testCreateDocumentBinary', DocumentTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testCreateDocumentPlain', DocumentTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testAddObject', FolderTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testRemoveObject', FolderTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testFolderLeadingDot', FolderTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testFolderTrailingDot', FolderTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testGetObjectParents', DocumentTest)) |
| # tts.addTests(TestLoader().loadTestsFromName('testGetObjectParentsMultiple', DocumentTest)) |
| |
| # WARNING: Potentially long-running tests |
| |
| # Query tests |
| #tts.addTests(TestLoader().loadTestsFromTestCase(QueryTest)) |
| #tts.addTest(QueryTest('testPropertyMatch')) |
| #tts.addTest(QueryTest('testFullText')) |
| #tts.addTest(QueryTest('testScore')) |
| #tts.addTest(QueryTest('testWildcardPropertyMatch')) |
| #tts.addTest(QueryTest('testSimpleSelect')) |
| |
| unittest.TextTestRunner().run(tts) |