Синхронизация независимых баз данных (трансакция)

WMix

герр M:)ller
Партнер клуба
сразу оговорюсь вопрос не о репликациях.

сейчас столкнулся с проблемой синхронизации 2х различных баз данных / таблиц (различных в смысле количество данных или драйвера)

общение должно происходить всегда в одну сторону по средствам soap.
те. к примеру, исполнитель (имеющий данные нескольких заказчиков), опрашивает одного из заказчиков в попытке взять новые заказы.

заказы это табличные данные, по структуре у всех учасников одинаковые:
( id:"22-42", order:"сделай, а..." ).
при этом id всегда уникалелен для всех учасников

в обратную сторону тоже протекают данные. те исполнитель к примеру рапортует о проделанной работе.
(id:1-5041, order_id:22-42, desc: готово, invoice: EUR 3.45 ).

далее вопрос реализации. те понятно что опрос новых записей это where last_sync_time > now().
а как провести трансакцию. между двумя машинами. заказчик не должен иметь возможности обновлять данные на момент синхронизации и только когда данные записались они пометились синхронизированными.

клиент (исполнитель)
PHP:
$db->begin();
$db->set( $ws->get() );  // {id:"22-42", order:"сделай, а..."}
$ws->done( now() );
$db->commit();

// after a long time

$db->begin();
if( $time = $ws->set( $db->get() ){ // {id:1-5041, order_id:22-42, desc: готово, invoice: EUR 3.45}
$db->set('{last_sync_time:$time}');
$db->commit();
сервер (заказчик)
PHP:
$ws->on( 'get', function(){
  $db->set( '{synchronization:1}'); // начали синхронизацию, данные нельзя менять
  return $db->get();
});
$ws->on('set', function($data){
  $db->begin();
  $db->set($data)
  $db->commit();
  return now();
});
$ws->on('done', function($time){
  $db->set( '{synchronization:0, last_sync_time:$time}' );
});
а как назад откатиться если done небыл вызван?, кроном?
а как определить что время вышло?
а не слишком ли сложно?

ну и вопрос готовых библиотечек тоже очень актуален.

к примеру на сервере исполнителя стоит mysql / php. у заказчика mssql или postgre / php или js в связке с монго.
 
Последнее редактирование:

WMix

герр M:)ller
Партнер клуба
поглядел на написанное, вот, как написать на уровне вебсервисов, select for update?
 

MiksIr

miksir@home:~$
Самому реализовывать, конечно. Ставить флаги в базе и все такое.
Есть два типа блокировок - пессимистичная и оптимистичная.

Пессимистичная (exclusive lock) - select for update, при чтении ставишь флаг в базе и ждешь апдейта + таймаут. Таймаут чистить или кроном или просто в приложении при обращении на изменение.

Оптимистичная (cas) - если с memcached работал - сталкивался. При выдаче данных отдаешь, например, время последнего изменения, обратный апдейт возвращает это же время. Равно = хорошо, не равно - ошибка, исполнитель пробует еще раз.
 

WMix

герр M:)ller
Партнер клуба
прикольно, пусть будет мы пессимисты, хотя некоторые могут быть и оптимистами, от заказчика же зависит,
и пусть мы делаем свой велосипед

минимальное количество полей которое вижу

id,sync,begin_sync_time, .... любые данные

где id должен быть уникален
sync флаг, являются ли данные синхронизированными три значения: да, нет, идет синхронизация.
begin_sync_time чтоб сбросить.

а для того чтоб любой написанный sql порождал синхронизацию, добавим trigger который сбрасывает sync на "нет".
как его написать так, чтобы можно было написать update table set sync='yes' после всех проделанных операций?
 

AnrDaemon

Продвинутый новичок
WMix, даже странно читать такое от тебя :)
Абисняу сначала.
1. транзакция
2. участие
3. Транзакция - это алгоритм, последовательность действий, предпринимаемых клиентом и сервером для согласования целостности данных в процессе работы. Ключевое слово здесь - согласование. Ты не можешь ЗАСТАВИТЬ сервер чего-то не делать, ты можешь его только попросить об этом. А согласится он на твою просьбу или нет - …
4. Согласование многофазных операций (Сервер А(SOAP) запросил данные у сервера Б(SOAP), сервер Б запросил их у сервера Ц(SQL), сервер Ц отдал данные серверу Б, сервер Б отдал данные серверу А) требует программирования всех серверов для поддержки единого алгоритма синхронизации.
5. Ты делаешь ошибку, предполагая, что сервер Б должен помечать данные как синхронизированные при отдаче их серверу А. Не должен. Более того, его это вообще колебать не должно. (Надеюсь, то, что сервер Ц вообще не колебёт, что там два других дурака между собой делают, объяснять не нужно?)
В сухом остатке: твоя конкретная задача решается без всяких транзакций хранением таблицы "сервер//дата последнего ордера, полученного от этого сервера" и запросом у сервера ордеров с датами больше даты последнего полученного. Если запрос по каким-то причинам не пройдёт, дата не меняется, и запрос будет повторен через час или как там часто тебе надо.
 

WMix

герр M:)ller
Партнер клуба
1. транзакция
2. участие
за это отдельное спасибо
учасТники запомнил )

5. Ты делаешь ошибку, предполагая, что сервер Б должен помечать данные как синхронизированные при отдаче их серверу А. Не должен.
я этого не говорил, я собираюсь пометить что синхронизированны вторым SOAP запросом. после того как сервер А успешно запишет данные.
но на тот момент пока сервер А ушел в работу, я хочу запретить менять записи которые обновляются. (dirty read)
зачем хочу "помечать данные как синхронизированные"? чтоб на следующий запрос дай новые/измененные, записей которые не изменились небыло.

В сухом остатке: твоя конкретная задача решается без всяких транзакций хранением таблицы "сервер//дата последнего ордера, полученного от этого сервера" и запросом у сервера ордеров с датами больше даты последнего полученного. Если запрос по каким-то причинам не пройдёт, дата не меняется, и запрос будет повторен через час или как там часто тебе надо.
этого мало, записи меняются

сухой вариант выглядит так.
1. клиент: ожидаю поставку 222 с грузами 42, 43 и 45 .
2. исполнитель: поставка 222 приехала, появились грузы 42 и 45, 43 испорчен/не приехал
4. клиент: груз 42 запакуйте, и вместе с 45 положите на палету, заэтикитируйте палету. (заказ 51)
5. исполнитель: заказ 51 исполнен, грузы 42 и 45 пропали со склада есть палета 88. счет 333
6. клиент: палету 88 отгрузите 20 числа (заказ 52)
7. исполнитель: заказ 52 исполнен, палета 88 отгружена, номер отправки 77, счет 334.

клиент пишет в свою базу а исполнитель в свою, записи автоматом синхронизируются.
 

AnrDaemon

Продвинутый новичок
То, что записи меняются, я не углядел в твоём вопросе. Тогда кипятим обратно к п.4. - нобходимо релизовывать нужный функционал на каждом шаге.

P.S.
Если кто-то спросит моего мнения, нехорошо, что такие вещи могут изменяться в БД. Сложно отследить повреждения данных.
Если бы я организовывал БД, это была бы таблица-лог, с FK на нужные внешние таблицы (заказы, склад, …).
 

WMix

герр M:)ller
Партнер клуба
я думал о логе, но боюсь перемудрить, (был бы готовый), отследить - проще в историю триггерить, и по надобности разбираться. лог есть на складе но он несколько о другом.
чтоб по id не пересекаться выделил на каждом из серверов по диапазону шириной в мио, может глупо но на этом этапе точно хватит. новые записи это те у которых дата изменения больше даты синхронизации, беру все новые записи из всех описанных табличек, и простой insert on duplicate key update. а сейчас подвисаю на конфликтах, те. записи лочатся только при синхронизации, а дальше легко может произойти что одна и таже запись была изменена в обоих базах, а проверок на поля нет (

тут есть облегчение, исполнитель меняет другие поля чем заказчик, но табличку делить не хочется. ищу место куда логику вставить
 

antson

Новичок
Партнер клуба
id,sync,begin_sync_time, .... любые данные
вот тут камень . по тексту звучало select for update
Трабла в том, что в виде
select id where
чтото делаем в другой таблице
update sync=yes where id=

при параллельном запуске не работает как семафор, т.е. синхра сделается дважды.
1.выборка
2. флаг в процессе
3.делаем
4. флаг синхронизировано тоже.


Мой вариант
1. найти ид со статусом требуетсинхры
2. выполнить запрос set status='впроцессе',worker='сервер-пид',sync_ts='текущее время' where ид=из_п.1 and status='требуетсинхры'
3. Если аффектед роу меньше 1 и число_попыток_не_превышено и не вышел таймаут то п.1
4. делаем синхру
5. ставим статус завершено где ид=из_п.1 and status='впроцессе' и worker='наш'
6.Если аффектед роу меньше 1, был сбой

и ни каких функций типа ++ или now()
 

WMix

герр M:)ller
Партнер клуба
2. выполнить запрос set status='впроцессе',worker='сервер-пид',sync_ts='текущее время' where ид=из_п.1 and status='требуетсинхры'
3. Если аффектед роу меньше 1 и число_попыток_не_превышено и не вышел таймаут то п.1
таймаут вышел, а sync_ts='текущее время'..

моделька такая, получилось многословно и я уже не знаю правильный ли путь я выбрал
таблица константных значений
PHP:
CREATE TABLE IF NOT EXISTS `sys__constants` (
  `ckey` varchar(25) NOT NULL,
  `cval` varchar(255) NOT NULL,
  UNIQUE KEY `name` (`ckey`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

INSERT INTO `sys__constants` (`ckey`, `cval`) VALUES
('sync_min_id', '1000000'),
('sync_max_id', '1999999');
табличка для тестов
PHP:
CREATE TABLE IF NOT EXISTS `sync` (
  `id` int(10) unsigned NOT NULL,
  `name` varchar(255) NOT NULL,
  `adate` datetime NOT NULL COMMENT 'создание',
  `udate` datetime NOT NULL COMMENT 'обновление',
  `sdate` datetime DEFAULT NULL COMMENT 'синхронизация',
  `ldate` datetime DEFAULT NULL COMMENT 'блокировка',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
автоинкремент в диапазоне
PHP:
DROP TRIGGER IF EXISTS `create_sync`;
DELIMITER //
CREATE TRIGGER `create_sync` BEFORE INSERT ON `sync`
FOR EACH ROW BEGIN
  DECLARE min_id, max_id, next_id INT UNSIGNED;
  DECLARE msg VARCHAR(150);

  SET NEW.adate = NOW();
  SET NEW.udate = NOW();

  SELECT CAST(cval AS UNSIGNED) INTO min_id
  FROM sys__constants WHERE ckey = 'sync_min_id';

  SELECT CAST(cval AS UNSIGNED) INTO max_id
  FROM sys__constants WHERE ckey = 'sync_max_id';

  IF NEW.id IS NULL OR NEW.id <= 0 OR NEW.id = ''
  THEN
    SELECT IFNULL( MAX(id), min_id ) INTO next_id
    FROM sync WHERE id BETWEEN min_id AND max_id;
    IF next_id > max_id
    THEN
      SELECT CONCAT( 'out of range [',min_id,',',max_id,']') INTO msg;
      SIGNAL SQLSTATE '40002' SET MESSAGE_TEXT = msg;
    END IF;
    SET NEW.id = next_id;
  ELSEIF (NEW.id < min_id OR NEW.id > max_id)
  THEN SET NEW.sdate = NOW();
  END IF;
END
//
DELIMITER ;
можно потестить
PHP:
TRUNCATE TABLE  `sync`;
INSERT INTO sync SET               name = "первая запись (id=1000000)";
INSERT INTO sync SET               name = "вторая запись (id=1000001)";
INSERT INTO sync SET id = 1000005, name = "третия запись с id=1000005";
INSERT INTO sync SET               name = "четвертая запись (id=1000006)";
INSERT INTO sync SET id = 2000001, name = "запись за пределами интервала с автосинхронизацией";
INSERT INTO sync SET               name = "пятая запись (id=1000007)";
INSERT INTO sync SET id = 1999999, name = "шестая запись с id=1999999 окончен интевал";

INSERT INTO sync SET               name = "ошибка";
синхронизация
PHP:
DROP TRIGGER IF EXISTS `update_sync`;
DELIMITER //
CREATE TRIGGER `update_sync` BEFORE UPDATE ON `sync`
FOR EACH ROW BEGIN
DECLARE msg VARCHAR(150);
IF (NEW.sdate < OLD.udate OR NEW.sdate IS NULL) AND NEW.ldate IS NOT NULL AND OLD.ldate IS NOT NULL
THEN
   SELECT CONCAT( 'record id=', NEW.id,' is locked!') INTO msg;
   SIGNAL SQLSTATE '40001' SET MESSAGE_TEXT = msg;

ELSEIF NEW.sdate >= NEW.udate THEN
   SET NEW.ldate = NULL;
END IF;

SET NEW.udate = NOW();
SET NEW.adate = OLD.adate;

END
//
DELIMITER ;
забираем данные и лочим
PHP:
START TRANSACTION;
SELECT * FROM sync WHERE sdate < udate OR sdate IS NULL FOR UPDATE;
UPDATE sync SET ldate = NOW() WHERE sdate < udate OR sdate IS NULL;
COMMIT;

update sync set name="ошибка" where id = 1000005;
освобождаем
PHP:
START TRANSACTION;
UPDATE  `sync` SET  `sdate` = NOW() WHERE ldate IS NOT NULL;
COMMIT;
 

WMix

герр M:)ller
Партнер клуба
да вроде похоже на правду...
а можно положиться что pconnect не разорвет соеденение между двумя soap запросами?
в смысле если это soap server
думаю сам ответил себе что нет. (разные сессии могут быть)

или вы предлагаете обойти soap и конектиться напрямую базу? гемор будет открыть базу для доступа на dyn-ip, тунели там dyndns ... кривовато, чтот славик я очкую, а если уже коннектиться, нахрен мне 2пц? клиент один! все запутался, пойду еще раз читать..

я правильно понимаю что XA работает только при условии что клиент всегда один, и подключение непрерывное? или я вообще не понял смысла uid в этом замуте! XA просто позволяет перепутать лог запросов (транзакций) от начала до коммита.

моя идея такая
PHP:
$ws->on( 'get', function(){
    $db->set( '{synchronization:1}'); // начали синхронизацию, данные нельзя менять
    //XA START 'xatest';
    /*
    START TRANSACTION;
    SELECT * FROM sync WHERE sdate < udate OR sdate IS NULL FOR UPDATE;
    UPDATE sync SET ldate = NOW() WHERE sdate < udate OR sdate IS NULL;
    COMMIT;
    */
    // XA END 'xatest';
    return $db->get();
});

$ws->on('done', function($time){
    $db->set( '{synchronization:0, last_sync_time:$time}' );
    // XA PREPARE 'xatest';
    // XA COMMIT 'xatest';
    /*
    START TRANSACTION;
    UPDATE  `sync` SET  `sdate` = NOW() WHERE ldate IS NOT NULL;
    COMMIT;
    */
 
});
 
Последнее редактирование:

WMix

герр M:)ller
Партнер клуба
вот если выделить все в отдельного демона который будет постоянно крутится и держать соеденение с базой, то возможно получится. не могу сообразить как это сделать. soap всеже нужен, дает единое api для любой системы.
на первом этапе достаточно решить на php / mysql.
 

grigori

( ͡° ͜ʖ ͡°)
Команда форума
WMix, не вижу зачем тебе постоянное соединение с обеими базами, тебе нужна удаленная синхронизация, а для этого вполне достаточно состояний
 

WMix

герр M:)ller
Партнер клуба
но получается же что между XA END 'xatest'; и XA PREPARE 'xatest'; разрыв соеденения, разве это работает? я на двух консольках попробывал - не работает.
 
Последнее редактирование:

grigori

( ͡° ͜ʖ ͡°)
Команда форума
хмм, тогда напиши драйвер на сокетах, и при разрыве соединения на сервере откатывай транзакцию, а на клиенте реализуй failover - повторную попытку,
морочно, но реально
 

WMix

герр M:)ller
Партнер клуба
вообще задачка мне очень нравится, сейчас уже сделаю простой велосипедик, как уже начал, тем более там не то чтоб большое пересечение данных, но вообще хотелось бы позаниматься и решить правильно. вижу в этом очень даже большой спрос.

те, если я правильно понимаю то на PHP (на java c threads былоб проще) правильно былоб если, крутился демон некий TM (Transaction Manager), с которым коммуницирует AP (Application Program) (то где открыт RPC или REST). как они коммуницируют, совершенно не важно, queue/socket. интереснее придумать протокол передачи данных, и обменом состояний, который по идеи должен повторить 2pc.

меня пугает асинхронность работы через канал между TM и AP, запулить еще ерунда, а как state получить, не запускать же на AP while(!getState())? как в этом случае поступить лучше?

хмм, тогда напиши драйвер на сокетах, и при разрыве соединения на сервере откатывай транзакцию, а на клиенте реализуй failover - повторную попытку,
морочно, но реально
нужно покрутить в голове, пока не соображу
 
Последнее редактирование:
Сверху