# This file is part of Radicale Server - Calendar Server # Copyright © 2012-2017 Guillaume Ayoub # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Radicale. If not, see . """ Radicale tests with simple requests. """ import base64 import os import posixpath import shutil import tempfile import xml.etree.ElementTree as ET from functools import partial import pytest from radicale import Application, config from . import BaseTest from .helpers import get_file_content class BaseRequestsMixIn: """Tests with simple requests.""" def test_root(self): """GET request at "/".""" status, _, answer = self.request("GET", "/") assert status == 302 assert answer == "Redirected to .web" def test_script_name(self): """GET request at "/" with SCRIPT_NAME.""" status, _, answer = self.request("GET", "/", SCRIPT_NAME="/radicale") assert status == 302 assert answer == "Redirected to .web" status, _, answer = self.request("GET", "", SCRIPT_NAME="/radicale") assert status == 302 assert answer == "Redirected to radicale/.web" def test_add_event(self): """Add an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/calendar; charset=utf-8" assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer def test_add_event_without_uid(self): """Add an event without UID.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics").replace("UID:event1\n", "") assert "\nUID:" not in event path = "/calendar.ics/event.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 1 and uids[0] # Overwrite the event with an event without UID and check that the UID # is still the same status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 assert "\r\nUID:%s\r\n" % uids[0] in answer def test_add_todo(self): """Add a todo.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 todo = get_file_content("todo1.ics") path = "/calendar.ics/todo1.ics" status, _, _ = self.request("PUT", path, todo) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/calendar; charset=utf-8" assert "VTODO" in answer assert "Todo" in answer assert "UID:todo" in answer def _create_addressbook(self, path): return self.request( "MKCOL", path, """\ """) def test_add_contact(self): """Add a contact.""" status, _, _ = self._create_addressbook("/contacts.vcf/") assert status == 201 contact = get_file_content("contact1.vcf") path = "/contacts.vcf/contact.vcf" status, _, _ = self.request("PUT", path, contact) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/vcard; charset=utf-8" assert "VCARD" in answer assert "UID:contact1" in answer status, _, answer = self.request("GET", path) assert status == 200 assert "UID:contact1" in answer def test_add_contact_without_uid(self): """Add a contact.""" status, _, _ = self._create_addressbook("/contacts.vcf/") assert status == 201 contact = get_file_content("contact1.vcf").replace("UID:contact1\n", "") assert "\nUID" not in contact path = "/contacts.vcf/contact.vcf" status, _, _ = self.request("PUT", path, contact) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 1 and uids[0] # Overwrite the contact with an contact without UID and check that the # UID is still the same status, _, _ = self.request("PUT", path, contact) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 assert "\r\nUID:%s\r\n" % uids[0] in answer def test_update(self): """Update an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, headers, answer = self.request("GET", path) assert "ETag" in headers assert status == 200 assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer assert "DTSTART;TZID=Europe/Paris:20130901T180000" in answer assert "DTEND;TZID=Europe/Paris:20130901T190000" in answer # Then we send another PUT request event = get_file_content("event1-prime.ics") status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert answer.count("BEGIN:VEVENT") == 1 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer assert "DTSTART;TZID=Europe/Paris:20130901T180000" not in answer assert "DTEND;TZID=Europe/Paris:20130901T190000" not in answer assert "DTSTART;TZID=Europe/Paris:20140901T180000" in answer assert "DTEND;TZID=Europe/Paris:20140901T210000" in answer def test_put_whole_calendar(self): """Create and overwrite a whole calendar.""" status, _, _ = self.request( "PUT", "/calendar.ics/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR") assert status == 201 event1 = get_file_content("event1.ics") status, _, _ = self.request( "PUT", "/calendar.ics/test_event.ics", event1) assert status == 201 # Overwrite events = get_file_content("event_multiple.ics") status, _, _ = self.request("PUT", "/calendar.ics/", events) assert status == 201 status, _, _ = self.request("GET", "/calendar.ics/test_event.ics") assert status == 404 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert "\r\nUID:event\r\n" in answer and "\r\nUID:todo\r\n" in answer assert "\r\nUID:event1\r\n" not in answer def test_put_whole_calendar_without_uids(self): """Create a whole calendar without UID.""" event = get_file_content("event_multiple.ics") event = event.replace("UID:event\n", "").replace("UID:todo\n", "") assert "\nUID:" not in event status, _, _ = self.request("PUT", "/calendar.ics/", event) assert status == 201 status, _, answer = self.request("GET", "/calendar.ics") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 2 for i, uid1 in enumerate(uids): assert uid1 for uid2 in uids[i + 1:]: assert uid1 != uid2 def test_put_whole_addressbook(self): """Create and overwrite a whole addressbook.""" contacts = get_file_content("contact_multiple.vcf") status, _, _ = self.request("PUT", "/contacts.vcf/", contacts) assert status == 201 status, _, answer = self.request("GET", "/contacts.vcf/") assert status == 200 assert ("\r\nUID:contact1\r\n" in answer and "\r\nUID:contact2\r\n" in answer) def test_put_whole_addressbook_without_uids(self): """Create a whole addressbook without UID.""" contacts = get_file_content("contact_multiple.vcf") contacts = contacts.replace("UID:contact1\n", "").replace( "UID:contact2\n", "") assert "\nUID:" not in contacts status, _, _ = self.request("PUT", "/contacts.vcf/", contacts) assert status == 201 status, _, answer = self.request("GET", "/contacts.vcf") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 2 for i, uid1 in enumerate(uids): assert uid1 for uid2 in uids[i + 1:]: assert uid1 != uid2 def test_delete(self): """Delete an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 # Then we send a DELETE request status, _, answer = self.request("DELETE", path) assert status == 200 assert "href>%s/calendar.ics///%s%s%s" in answer status, _, answer = self.request( "PROPFIND", "/calendar.ics/event.ics", propfind) assert "" in answer def test_propfind_allprop(self): status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") status, _, _ = self.request("PUT", "/calendar.ics/event.ics", event) assert status == 201 propfind = get_file_content("allprop.xml") status, _, answer = self.request( "PROPFIND", "/calendar.ics/", propfind) assert "" in answer status, _, answer = self.request( "PROPFIND", "/calendar.ics/event.ics", propfind) assert "" in answer def test_proppatch(self): """Write a property and read it back.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 proppatch = get_file_content("proppatch1.xml") status, _, answer = self.request( "PROPPATCH", "/calendar.ics/", proppatch) assert status == 207 assert "calendar-color" in answer assert "200 OK#BADA55" in answer def test_put_whole_calendar_multiple_events_with_same_uid(self): """Add two events with the same UID.""" status, _, _ = self.request( "PUT", "/calendar.ics/", get_file_content("event2.ics")) assert status == 201 status, _, answer = self.request( "REPORT", "/calendar.ics/", """ """) assert status == 207 assert answer.count("") == 1 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert answer.count("BEGIN:VEVENT") == 2 def _test_filter(self, filters, kind="event", test=None, items=(1,)): filter_template = "{}" if kind in ("event", "journal", "todo"): create_collection_fn = partial(self.request, "MKCALENDAR") path = "/calendar.ics/" filename_template = "{}{}.ics" namespace = "urn:ietf:params:xml:ns:caldav" report = "calendar-query" elif kind == "contact": create_collection_fn = self._create_addressbook if test: filter_template = '{{}}'.format( test) path = "/contacts.vcf/" filename_template = "{}{}.vcf" namespace = "urn:ietf:params:xml:ns:carddav" report = "addressbook-query" else: raise ValueError("Unsupported kind: %r" % kind) status, _, _ = self.request("DELETE", path) assert status in (200, 404) status, _, _ = create_collection_fn(path) assert status == 201 for i in items: filename = filename_template.format(kind, i) event = get_file_content(filename) status, _, _ = self.request( "PUT", posixpath.join(path, filename), event) assert status == 201 filters_text = "".join( filter_template.format(filter_) for filter_ in filters) status, _, answer = self.request( "REPORT", path, """ {2} """.format(namespace, report, filters_text)) assert status == 207 return answer def test_addressbook_empty_filter(self): self._test_filter([""], kind="contact") def test_addressbook_prop_filter(self): assert "href>/contacts.vcf/contact1.vcf es """], "contact") assert "href>/contacts.vcf/contact1.vcf es """], "contact") assert "href>/contacts.vcf/contact1.vcf a """], "contact") assert "href>/contacts.vcf/contact1.vcf test """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") def test_addressbook_prop_filter_any(self): assert "href>/contacts.vcf/contact1.vcf test test """], "contact", test="anyof") assert "href>/contacts.vcf/contact1.vcf a test """], "contact", test="anyof") assert "href>/contacts.vcf/contact1.vcf test test """], "contact") def test_addressbook_prop_filter_all(self): assert "href>/contacts.vcf/contact1.vcf tes est """], "contact", test="allof") assert "href>/contacts.vcf/contact1.vcf test test """], "contact", test="allof") def test_calendar_empty_filter(self): self._test_filter([""]) def test_calendar_tag_filter(self): """Report request with tag-based filter on calendar.""" assert "href>/calendar.ics/event1.ics"""]) def test_item_tag_filter(self): """Report request with tag-based filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_not_tag_filter(self): """Report request with tag-based is-not filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_prop_filter(self): """Report request with prop-based filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_not_prop_filter(self): """Report request with prop-based is-not filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_mutiple_filters(self): """Report request with multiple filters on an item.""" assert "href>/calendar.ics/event1.ics """, """ """]) assert "href>/calendar.ics/event1.ics """, """ """]) assert "href>/calendar.ics/event1.ics """]) def test_text_match_filter(self): """Report request with text-match filter on calendar.""" assert "href>/calendar.ics/event1.ics event """]) assert "href>/calendar.ics/event1.ics event """]) assert "href>/calendar.ics/event1.ics unknown """]) assert "href>/calendar.ics/event1.ics event """]) def test_param_filter(self): """Report request with param-filter on calendar.""" assert "href>/calendar.ics/event1.ics ACCEPTED """]) assert "href>/calendar.ics/event1.ics UNKNOWN """]) assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_time_range_filter_events(self): """Report request with time-range filter on events.""" answer = self._test_filter([""" """], "event", items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], "event", items=range(1, 6)) assert "href>/calendar.ics/event1.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=(6, 7, 8)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics """], items=(6, 7, 8)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics """], items=(6, 7, 8)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo3.ics/calendar.ics/todo4.ics/calendar.ics/todo5.ics/calendar.ics/todo6.ics/calendar.ics/todo7.ics/calendar.ics/todo8.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo3.ics/calendar.ics/todo4.ics/calendar.ics/todo5.ics/calendar.ics/todo6.ics/calendar.ics/todo7.ics/calendar.ics/todo8.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo3.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo7.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """) assert status == 207 assert "href>%s<" % event_path in answer def _report_sync_token(self, calendar_path, sync_token=None): sync_token_xml = ( "" % sync_token if sync_token else "") status, _, answer = self.request( "REPORT", calendar_path, """ %s """ % sync_token_xml) if sync_token and status == 412: return None, None assert status == 207 xml = ET.fromstring(answer) sync_token = xml.find("{DAV:}sync-token").text.strip() assert sync_token return sync_token, xml def test_report_sync_collection_no_change(self): """Test sync-collection report without modifying the collection""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) assert xml.find("{DAV:}response") is not None new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) assert sync_token == new_sync_token assert xml.find("{DAV:}response") is None def test_report_sync_collection_add(self): """Test sync-collection report with an added item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") assert xml.find("{DAV:}response") is not None assert xml.find("{DAV:}response/{DAV:}status") is None def test_report_sync_collection_delete(self): """Test sync-collection report with a deleted item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request("DELETE", event_path) assert status == 200 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") assert "404" in xml.find("{DAV:}response/{DAV:}status").text def test_report_sync_collection_create_delete(self): """Test sync-collection report with a created and deleted item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 status, _, _ = self.request("DELETE", event_path) assert status == 200 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") assert "404" in xml.find("{DAV:}response/{DAV:}status").text def test_report_sync_collection_modify_undo(self): """Test sync-collection report with a modified and changed back item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event1 = get_file_content("event1.ics") event2 = get_file_content("event2.ics") event_path = posixpath.join(calendar_path, "event1.ics") status, _, _ = self.request("PUT", event_path, event1) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request("PUT", event_path, event2) assert status == 201 status, _, _ = self.request("PUT", event_path, event1) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") assert xml.find("{DAV:}response") is not None assert xml.find("{DAV:}response/{DAV:}status") is None def test_report_sync_collection_move(self): """Test sync-collection report a moved item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event1_path = posixpath.join(calendar_path, "event1.ics") event2_path = posixpath.join(calendar_path, "event2.ics") status, _, _ = self.request("PUT", event1_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request( "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") for response in xml.findall("{DAV:}response"): if response.find("{DAV:}status") is None: assert response.find("{DAV:}href").text == event2_path else: assert "404" in response.find("{DAV:}status").text assert response.find("{DAV:}href").text == event1_path def test_report_sync_collection_move_undo(self): """Test sync-collection report with a moved and moved back item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event1_path = posixpath.join(calendar_path, "event1.ics") event2_path = posixpath.join(calendar_path, "event2.ics") status, _, _ = self.request("PUT", event1_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request( "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") assert status == 201 status, _, _ = self.request( "MOVE", event2_path, HTTP_DESTINATION=event1_path, HTTP_HOST="") assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not sync_token: pytest.skip("storage backend does not support sync-token") created = deleted = 0 for response in xml.findall("{DAV:}response"): if response.find("{DAV:}status") is None: assert response.find("{DAV:}href").text == event1_path created += 1 else: assert "404" in response.find("{DAV:}status").text assert response.find("{DAV:}href").text == event2_path deleted += 1 assert created == 1 and deleted == 1 def test_report_sync_collection_invalid_sync_token(self): """Test sync-collection report with an invalid sync token""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token( calendar_path, "http://radicale.org/ns/sync/INVALID") assert not sync_token def test_propfind_sync_token(self): """Retrieve the sync-token with a propfind request""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) assert sync_token != new_sync_token def test_propfind_same_as_sync_collection_sync_token(self): """Compare sync-token property with sync-collection sync-token""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) assert sync_token == new_sync_token def test_calendar_getcontenttype(self): """Test report request on an item""" status, _, _ = self.request("MKCALENDAR", "/test/") assert status == 201 for component in ("event", "todo", "journal"): event = get_file_content("{}1.ics".format(component)) status, _, _ = self.request("PUT", "/test/test.ics", event) assert status == 201 status, _, answer = self.request( "REPORT", "/test/", """ """) assert status == 207 assert ">text/calendar;charset=utf-8;component=V{}<".format( component.upper()) in answer def test_addressbook_getcontenttype(self): """Test report request on an item""" status, _, _ = self._create_addressbook("/test/") assert status == 201 contact = get_file_content("contact1.vcf") status, _, _ = self.request("PUT", "/test/test.vcf", contact) assert status == 201 status, _, answer = self.request( "REPORT", "/test/", """ """) assert status == 207 assert ">text/vcard;charset=utf-8<" in answer def test_authorization(self): authorization = "Basic " + base64.b64encode(b"user:").decode() status, _, answer = self.request( "PROPFIND", "/", """ """, HTTP_AUTHORIZATION=authorization) assert status == 207 assert "href>/user/<" in answer def test_authentication(self): """Test if server sends authentication request.""" self.configuration["auth"]["type"] = "htpasswd" self.configuration["auth"]["htpasswd_filename"] = os.devnull self.configuration["auth"]["htpasswd_encryption"] = "plain" self.configuration["rights"]["type"] = "owner_only" self.application = Application(self.configuration, self.logger) status, headers, _ = self.request("MKCOL", "/user/") assert status in (401, 403) assert headers.get("WWW-Authenticate") def test_principal_collection_creation(self): """Verify existence of the principal collection.""" status, _, _ = self.request("PROPFIND", "/user/", HTTP_AUTHORIZATION=( "Basic " + base64.b64encode(b"user:").decode())) assert status == 207 def test_existence_of_root_collections(self): """Verify that the root collection always exists.""" # Use PROPFIND because GET returns message status, _, _ = self.request("PROPFIND", "/") assert status == 207 # it should still exist after deletion status, _, _ = self.request("DELETE", "/") assert status == 200 status, _, _ = self.request("PROPFIND", "/") assert status == 207 def test_custom_headers(self): if not self.configuration.has_section("headers"): self.configuration.add_section("headers") self.configuration.set("headers", "test", "123") # Test if header is set on success status, headers, _ = self.request("OPTIONS", "/") assert status == 200 assert headers.get("test") == "123" # Test if header is set on failure status, headers, _ = self.request( "GET", "/.well-known/does not exist") assert status == 404 assert headers.get("test") == "123" def test_missing_uid(self): """Verify that missing UIDs are added in a stable manner.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event_without_uid = get_file_content("event1.ics").replace( "UID:event1\n", "") assert "UID" not in event_without_uid path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event_without_uid) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 uid = None for line in answer.split("\r\n"): if line.startswith("UID:"): uid = line[len("UID:"):] assert uid status, _, _ = self.request("PUT", path, event_without_uid) assert status == 201 status, _, answer = self.request("GET", path) assert status == 200 assert "UID:%s\r\n" % uid in answer class BaseFileSystemTest(BaseTest): """Base class for filesystem backend tests.""" storage_type = None def setup(self): self.configuration = config.load() self.configuration["storage"]["type"] = self.storage_type self.colpath = tempfile.mkdtemp() self.configuration["storage"]["filesystem_folder"] = self.colpath # Disable syncing to disk for better performance self.configuration["storage"]["filesystem_fsync"] = "False" # Required on Windows, doesn't matter on Unix self.configuration["storage"]["filesystem_close_lock_file"] = "True" self.application = Application(self.configuration, self.logger) def teardown(self): shutil.rmtree(self.colpath) class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): """Test BaseRequests on multifilesystem.""" storage_type = "multifilesystem" def test_fsync(self): """Create a directory and file with syncing enabled.""" self.configuration["storage"]["filesystem_fsync"] = "True" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 def test_hook(self): """Run hook.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 207 def test_hook_read_access(self): """Verify that hook is not run for read accesses.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("PROPFIND", "/") assert status == 207 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 404 @pytest.mark.skipif(os.system("type flock") != 0, reason="flock command not found") def test_hook_storage_locked(self): """Verify that the storage is locked when the hook runs.""" self.configuration["storage"]["hook"] = ( "flock -n .Radicale.lock || exit 0; exit 1") status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 def test_hook_principal_collection_creation(self): """Verify that the hooks runs when a new user is created.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("PROPFIND", "/", HTTP_AUTHORIZATION=( "Basic " + base64.b64encode(b"user:").decode())) assert status == 207 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 207 def test_hook_fail(self): """Verify that a request fails if the hook fails.""" self.configuration["storage"]["hook"] = "exit 1" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status != 201 def test_item_cache_rebuild(self): """Delete the item cache and verify that it is rebuild.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer1 = self.request("GET", path) assert status == 200 cache_folder = os.path.join(self.colpath, "collection-root", "calendar.ics", ".Radicale.cache", "item") assert os.path.exists(os.path.join(cache_folder, "event1.ics")) shutil.rmtree(cache_folder) status, _, answer2 = self.request("GET", path) assert status == 200 assert answer1 == answer2 assert os.path.exists(os.path.join(cache_folder, "event1.ics")) class TestCustomStorageSystem(BaseFileSystemTest): """Test custom backend loading.""" storage_type = "tests.custom.storage" def test_root(self): """A simple test to verify that the custom backend works.""" BaseRequestsMixIn.test_root(self)