Подписка на данные, которые публикуются в PPS
Клиенты службы PPS могут подписываться на несколько объектов, а объекты PPS ― иметь нескольких подписчиков. Когда издатель изменяет объект, все клиенты, которые подписаны на него, получают уведомление об изменении.
Чтобы подписаться на объект, клиент вызывает функцию open() для файла объекта с флагом O_RDONLY
(только подписка) или O_RDWR
(публикация и подписка). После этого подписчик может выполнять чтение объекта с помощью функции read(), которая возвращает длину считанных данных в байтах. Если выделенный буфер недостаточен для приема считываемых данных, операция чтения завершается с ошибкой.
Содержание статьи:
По умолчанию чтение объектов PPS является неблокирующим, т.е. функция open() выполняется с флагом O_NONBLOCK
и операции чтения не блокируют клиента. Такое поведение нетипично для большинства файловых систем; оно предотвращает «зависание» стандартных утилит при вызове функции read() для файла.
Например, вы можете создать полный архив состояния PPS с помощью утилиты tar; если бы поведение PPS по умолчанию было иным, утилита tar смогла бы открыть и прочитать только первый файл.
Несмотря на то, что по умолчанию PPS открывает объекты для неблокирующих операций чтения, при опросе объектов PPS предпочтительно использовать блокирующие операции. Блокирующая операция чтения возвращает данные только после того, как изменяется объект или его атрибуты.
Чтобы включить выполнение блокирующих операций чтения, необходимо добавить суффикс ?wait к путевому имени объекта при его открытии с помощью функции open(). Пример:
/pps/media/PlayList
/pps/media/PlayList?wait
Информацию о параметре ?wait и других параметрах открытия путевых имен см. в главе Параметры и спецификаторы.
Обычно цикл подписчика находится в отдельном потоке. Если подписчик открыл объект с параметром ?wait, этот цикл может выглядеть следующим образом:
/* Предполагается, что объект открыт с использованием параметра ?waitВ этом примере не проверяются ошибки. */for ( ;; ) {read( fd, buf, sizeof( buf ) ); // функция read ждет изменения объекта.process( buf );}
Чтобы включить блокирующие операции чтения для объекта, который был открыт без использования параметра ?wait, можно сбросить флаг O_NONBLOCK
. Это приведет к тому, что подписчик будет ожидать изменения объекта или его атрибутов.
Для сброса указанного флага можно воспользоваться функцией fcntl(). Пример:
flags = fcntl( fd, F_GETFL );flags &= ~O_NONBLOCK;fcntl( fd, F_SETFL, flags );
Также можно использовать функцию ioctl():
int i = 0;ioctl( fd, FIONBIO, &i );
После очистки флага O_NONBLOCK
можно выполнить операцию read(), которая будет ожидать изменения объекта.
В службе PPS реализована функция io_notify(), с помощью которой подписчики могут запрашивать уведомления в виде импульса, сигнала, семафора и др. Чтобы считать содержимое объекта после получения уведомления о его изменении, необходимо вызвать функцию read() для его файла. Пример:
/* Обработка всех полученных событий */while ( ionotify( fd, _NOTIFY_ACTION_POLLARM, _NOTIFY_COND_INPUT, &event ) & _NOTIFY_CONT_INPUT ) {if ( read( fd, buf, sizeof( buf ) ) > 0 ) // желательно выполнять чтение без O_NONBLOCKprocess( buf );}/* позже будет сгенерировано событие-уведомление */
Можно принимать уведомления о наличии данных в открытом файле с помощью одного из двух простых механизмов:
O_NONBLOCK
с помощью функции fnctl() после вызова open().
Подписчик может открывать объект в полном режиме, дельта-режиме, а также одновременно в полном и дельта-режимах. По умолчанию используется полный режим. Чтобы открыть объект в дельта-режиме, необходимо добавить суффикс ?delta к его путевому имени в функции open().
Информацию о параметре ?delta и других параметрах открытия путевых имен см. в главе Параметры и спецификаторы.
В полном режиме (который используется по умолчанию) подписчик всегда работает с единой и целостной версией объекта, которая существует в момент доступа к нему.
Если издатель многократно изменяет объект до того, как подписчик запрашивает его, подписчик получает объект только в состоянии на момент запроса. Если объект снова изменяется, подписчик получает соответствующее уведомление. Таким образом, в полном режиме подписчик может пропускать изменения, предшествующие запросу объекта.
В дельта-режиме подписчик получает только все изменения атрибутов объекта.
Поскольку при первом чтении подписчик не обладает какой-либо информацией о состоянии объекта, служба PPS считает, что все его атрибуты изменились, и возвращает их подписчику. Последующие операции чтения возвращают подписчику только атрибуты, которые изменились с момента предыдущего чтения.
Таким образом, в дельта-режиме подписчик всегда получает все изменения объекта.
На следующем рисунке сравнивается информация, которая передается подписчикам объекта PPS в полном режиме и дельта-режиме.
Служба PPS создает объекты и хранит информацию об их состояниях в любом режиме. Режим открытия объекта не изменяет сам объект, а лишь определяет представление его изменений с точки зрения подписчика.
Когда подписчик открывает объект в дельта-режиме, служба PPS создает новую очередь для хранения изменений объекта. Если таких подписчиков несколько, каждый из них имеет собственную очередь, в которую служба PPS помещает копии изменений. Если ни один из подписчиков не открывает объект в дельта-режиме, служба PPS не создает для него очереди изменений.
![]() | При завершении работы служба PPS сохраняет объекты, но не сохраняет их дельта-очереди. |
Если издатель изменяет несколько атрибутов в одном вызове write(), служба PPS группирует эти изменения и возвращает их подписчику в вызове read(). Другими словами, PPS сохраняет порядок изменений и обеспечивает их атомарность. Пример:
write() write()time::1.23 time::1.24duration::4.2 write()duration::4.2read() read()@objname @objnametime::1.23 time:1.24duration::4.2 @objnameduration::4.2
Когда клиент выполняет запись в серверный объект, сообщение передается только приложению (которое называется сервером), создавшему этот объект с помощью параметра ?server Другие клиенты не получают это сообщение.
При записи служба PPS добавляет в конец имени объекта уникальный идентификатор, с помощью которого сервер определяет клиентское подключение, используемое для доставки сообщения. Это позволяет хранить информацию о состоянии подключения. Например, конструкция:
@foo.1234
обозначает объект с именем "foo" и клиентским идентификатором "1234". При подключении клиента сервер читает новый объект с префиксом "+" (например, "+@foo.1234"). При отключении клиента сервер получает сообщение, в котором в качестве префикса используется знак "-" вместо "+".
Чтобы передать ответное сообщение только одному клиенту, сервер должен указать его уникальный идентификатор после имени объекта; в противном случае сообщение передается всем клиентам, которые подключены к объекту.
Приложение, которое открывает объект с параметром ?server, автоматически становится его критическим издателем, а также получает уведомления в дельта-режиме.
На следующем рисунке показано взаимодействие со службой PPS с использованием параметра ?server:
Служба PPS поддерживает специальные объекты, которые обеспечивают клиентам возможность подписываться на несколько объектов:
.all
— позволяет получать уведомления об изменениях всех объектов соответствующего каталога. .notify
— позволяет получать уведомления об изменениях всех объектов, которые входят в состав группы уведомлений.
Каталоги являются естественным механизмом группировки, который облегчает подписку клиентов службы PPS на множество объектов. Подписчик может открывать несколько объектов с помощью функции open(), а затем проверять файловые дескрипторы посредством функции select(). Тем не менее, удобнее использовать специальный объект .all, который представляет все объекты каталога.
Например, допустим, что каталог /pps
содержит следующую структуру с файлами объектов:
rear/left/PlayCurrentrear/left/Timerear/left/PlayError
Открыв объект rear/left/.all
, клиент получает уведомления об изменениях любых объектов, которые находятся в каталоге rear/left
. Операция чтения в полном режиме возвращает не более одного объекта.
read()@Timeposition::18duration::300read()@PlayCurrentartist::The Beatlesgenre::Pop... полный набор атрибутов объекта
Тем не менее, клиент, который открывает объект .all в дельта-режиме, получает очередь с изменениями атрибутов всех объектов в каталоге. В этом случае один вызов read() может возвращать множество объектов.
read()@Timeposition::18@Timeposition::19@PlayCurrentartist::The Beatlesgenre::Pop
Служба PPS включает в себя механизм объединения нескольких файловых дескрипторов в группы. Этот механизм позволяет клиенту получать уведомления об изменениях любых объектов, входящих в состав группы, считывая только специальный объект уведомления PPS.
Чтобы создать группу объектов, выполните следующие действия:
.notify в корневом каталоге файловой системы службы PPS.
.notify; первая операция чтения вернет короткую строку (длиной менее 16 символов) с именем группы, в которую следует включить другие файловые дескрипторы. Чтобы включить файловый дескриптор в группу, при открытии файла следует указать в путевом имени параметр "?notify=group:value", где:
.notify
![]() | Возвращаемая строка с именем группы оканчивается символом перевода строки, который необходимо удалять перед ее использованием. |
Информацию о параметре ?notify и других параметрах открытия путевых имен см. в главе Параметры и спецификаторы.
После создания группы объектов и включения в нее файловых дескрипторов можно использовать эту группу для получения уведомлений об изменениях объектов, входящих в ее состав.
При появлении данных для чтения в каком-либо из файлов группы операция чтения файлового дескриптора объекта уведомлений возвращает строку, которая передается в параметре путевого имени "?notify=group:value".
Например, если служба PPS использует точку монтирования /pps
, можно написать следующий код:
char noid[16], buf[128];int notify_fd, fd1, fd2;notify_fd = open( "/pps/.notify", O_RDONLY );read( notify_fd, &noid[0], sizeof( noid ) );sprintf( buf, "/pps/fish?notify=%s:water", noid );fd1 = open( buf, O_RDONLY );sprintf( buf, "/pps/dir/birds?notify=%s:air", noid );fd2 = open( buf, O_RDONLY );while ( read( notify_fd, &buf, sizeof( buf ) > 0 ) {printf( "Notify %s\n", buf );}
Данные, которые выводятся в приведенном выше цикле while, выглядят следующим образом:
Notify 243:waterNotify 243:airNotify 243:waterNotify 243:air
![]() | При чтении объекта, который входит в состав группы, подписчику необходимо многократно вызывать функцию read() для каждого указанного изменения. В объект могут быть внесены несколько изменений, однако нет гарантии, что каждое изменение указывается в файловом дескрипторе группы. |
При закрытии файлового дескриптора объекта, входящего в состав группы, строка, которая передается в уведомлении об изменении, начинается со знака "-". Пример:
-243:air
Предыдущий раздел: Устойчивая служба публикации/подписки (PPS)