обработка сообщений

iceman

говнокодер
Есть терминалы, которые отсылают информацию о себе в систему мониторинга, как HTTP GET запрос, данные закодировав в url.

Есть система мониторинга, которая должна принять сообщение, сохранить (не обрабатывая) и дать ответ терминалу, что получил сообщение и положил в бд, больше мне не шли его.

Вот задача теперь обработать эти сообщения.

Статистика такая: за сутки в таблице оказывается 40 000 записей, это от 200 терминалов, 27 записей/минуту,

Как обработать эти записи?

написать курсор, отсортируя по дате поступления, обработать каждую. и все это засунуть в DBMS_JOB?

джоб запускать каждую минуту? (желательно для того что бы данные поступали пользователю оперативно...)
но джоб возможно не успеет отработать за минуту, и запустится паралельно этот же джоб в новой сессии.

какие есть идеи?
 

iceman

говнокодер
вот еще 2 способа надумал:

1) либо при получение сообщения - создавать бекграунд процесс, который начнет обрабатывать это сообщение, что означает что таким процессов будет много.

2) либо это все на Java замутить, на Application Server (GlassFish), будет крутится приложение, сканить таблицу и если что то есть - обрабатывать данные. но тут в одном потоке гарантированно!

если сделать в много потоков. в пределах одной сессии - будет выборка №1 которая выберит 10 строк, обработает только 2, потом параллельно запустится еще 1 поток, который сделает выборку и получит уже 8 строк, и начнет обрабатывать дальше - не обработают ли они одну и ту же строчку 2 раза???
 

флоппик

promotor fidei
Команда форума
Партнер клуба
если сделать в много потоков. в пределах одной сессии - будет выборка №1 которая выберит 10 строк, обработает только 2, потом параллельно запустится еще 1 поток, который сделает выборку и получит уже 8 строк, и начнет обрабатывать дальше - не обработают ли они одну и ту же строчку 2 раза???
Помечай строки находящиеся в обработке.
 

prolis

Новичок
Я бы для начала поизучал вот это: 5*27 = 135 сек
5 сек для чего-бы то не было в операционной работе - уже многовато. Там 27 разных таблиц, почему нельзя все терминалы за один проход обработать?
Такое может быть если архивную пытаться архивную таблицу отсортировать по дате, потом полочить во время обработки и т.д.
Отбирать записи лучше по Ид: сначала узнаем максимальный обработанный, потом тянем из архива с Ид большим чем максимальный.
Самое простое - всё-таки уместить максимальное время обработки в 5 секунд в минуту, бОльшая оперативность конечным пользователям-гуманоидам ни к чему.
 

phprus

Moderator
Команда форума
iceman
А что с записями то нужно делать и откуда взялась константа 5? Можно-ли это оптимизировать?
У записей есть уникальный auto_increment код или аналогичный по сути параметр (например timestamp)?
Если да, то можно поделить записи на четные/нечетные (или по аналогии) и запускать несколько DBMS_JOB. Метод сработает, если Oracle может запускать разные JOB параллельно на разных ядрах.
Минус метода в том, что придется хранить не один ID последней обработанной записи, а отдельные ID на каждый JOB, чтобы ничего не потерять.
 

флоппик

promotor fidei
Команда форума
Партнер клуба
Можно поиграться с LOCK TABLE ... IN ROW EXCLUSIVE MODE внутри транзакции. Хотя элементарнее всего ввести статус отображающий что строка взята на обработку, это позволит иметь любое количество джобов.
 

флоппик

promotor fidei
Команда форума
Партнер клуба
iceman
У записей есть уникальный auto_increment?
В оракле нет автоинкремента, но есть триггер + сиквенс, что позволяет в принципе навесить его в любую таблицу, независимо от структуры, и не обязательно порядковый, можно в сиквенсе прямо назначать id джоба например.
 

iceman

говнокодер
флоппик
В оракле нет автоинкремента, но есть триггер + сиквенс, что позволяет в принципе навесить его в любую таблицу, независимо от структуры, и не обязательно порядковый, можно в сиквенсе прямо назначать id джоба наприме
триггера нету, сиквенс вставляю сразу в процедуре, которую дергает веб приложение на яве, получившая информацию от терминала.
раньше и обработка была сразу, но ее нужно исключить, процедура не успевает отрабатывать из за общей нагрузки на бд, и вместе с ним глассфиш, в итоге терминалам не возвращается 200 код ответа, они повторяют отсылку сообщения снова.

терь задача веб приложение - просто сохранить данные, обработку сделать позже.

prolis
Я бы для начала поизучал вот это: 5*27 = 135 сек
5 сек для чего-бы то не было в операционной работе - уже многовато. Там 27 разных таблиц, почему нельзя все терминалы за один проход обработать?
Такое может быть если архивную пытаться архивную таблицу отсортировать по дате, потом полочить во время обработки и т.д.
Отбирать записи лучше по Ид: сначала узнаем максимальный обработанный, потом тянем из архива с Ид большим чем максимальный.
Самое простое - всё-таки уместить максимальное время обработки в 5 секунд в минуту, бОльшая оперативность конечным пользователям-гуманоидам ни к чему.
какие еще 27 таблиц? 27 обращений от терминалов в минуту в среднем (за 5 рабочих дней)

phprus
ой сорри, половину сообщения отредактировал, забыл убрать 5. Было так - при нагруженной БД процедура отрабатывает больше 1-2 сек.
при не нагруженной - доли секунд

процедуру оптимизирую конечно, но вопрос в том, как лучше обработать данные, с учетом "даты поступления" этих записей в таблицу.
т.е. старшие записи обработать первее, чтобы соблюзти хроналогию событий и не нарушить алгоритмы обработки.


-------- ДОПОЛНИТЕЛЬНО ----------
секцинировал таблицу по полю DATETIME (время попадания записив таблицу)
но в запросах использую ограничение по дате trunc(DATETIME) = trunc(sysdate) (индекс так же по trunc(DATETIME))

в итоге смотрю на план, PARTITION RANGE ALL, убераю trunc - PARTITION RANGE ITERATOR, хочу чтобы с trunc оракл не сканил все партиции без нужды, но и ограничивать себя select from paririron N - не хочу.
 

phprus

Moderator
Команда форума
В оракле нет автоинкремента, но есть триггер + сиквенс, что позволяет в принципе навесить его в любую таблицу, независимо от структуры, и не обязательно порядковый, можно в сиквенсе прямо назначать id джоба например.
Название способа реализации концептуально что-то меняет? По этому с учетом того, что я знаю Oracle и работал с ним, я вынужден сослаться на пост #5. (Спрашивать про наличие уникального ключа, формируемого, например при помощи sequence и триггера - это писать значительно больше теста ни сколько не меняя суть вопроса)

iceman
ой сорри, половину сообщения отредактировал, забыл убрать 5. Было так - при нагруженной БД процедура отрабатывает больше 1-2 сек.
при не нагруженной - 0,032 сек.
Вот этого я не понял. 1-2 секунды это на одну запись? Если да, то тут уже все становится печальнее из-за того что просто суммарного CPU-time сервера на обработку записей в реальном времени может не хватить (сравни 1-2-3 с на обработку одной записи и поступление этих записей в среднем раз в 2 секунды).

процедуру оптимизирую конечно, но вопрос в том, как лучше обработать данные, с учетом "даты поступления" этих записей в таблицу.
Тут вопросов два.
Первый - можно ли успеть обрабатывать в реальном времени. Если да, то по крону выбираются необработанные записи и обрабатываются (выборка при необходимости сортируется). Сложности нет.
Второй - как распределить задачу по процессам, если в один поток не успеть. Один из вариантов балансировки предложил я, но в случае многопоточности гарантировать, что при обработки записи за время Х, все записи ранее Х были обработаны уже невозможно.
 

prolis

Новичок
Тогда такой алгоритм:
1. update table set somefield=id mod(Id, N) where somefield is null /or Id>processedId/ (делим необработанное по количеству потоков N)
2. select from table where somefield=N order by datetime (каждый поток забирает свои записи и обрабатывает)
3.update table set somefield='processed' where id=:Id (обозначаем окончание обработки)

это пока с локами можно не заморачиваться в одной сессии
 
Сверху