From 2dfb0a1d9286817051559885bbdc39a0765f2a73 Mon Sep 17 00:00:00 2001 From: p2k Date: Sat, 23 Jun 2012 08:45:38 +0200 Subject: [PATCH 1/3] Improved CouchDB Share Logger (WiP) --- apps/ecoinpool/src/couchdb_sharelogger.erl | 233 ++++++++++++++---- apps/ecoinpool/src/ecoinpool_rpc.erl | 2 +- apps/ecoinpool/src/ecoinpool_util.erl | 12 +- apps/ecoinpool/src/mysql_sharelogger.erl | 2 +- apps/ecoinpool/src/remote_sharelogger.erl | 76 ++++++ apps/ecoinpool/src/sql_sharelogger.erl | 20 +- .../src/mycouch_replicator.erl | 82 +++--- extras/send_rpc.sh | 4 +- site/README.md | 3 +- test_launch.config.example | 5 +- 10 files changed, 335 insertions(+), 104 deletions(-) create mode 100644 apps/ecoinpool/src/remote_sharelogger.erl diff --git a/apps/ecoinpool/src/couchdb_sharelogger.erl b/apps/ecoinpool/src/couchdb_sharelogger.erl index b8dad8e..2c49235 100644 --- a/apps/ecoinpool/src/couchdb_sharelogger.erl +++ b/apps/ecoinpool/src/couchdb_sharelogger.erl @@ -29,11 +29,40 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - candidates_only, - conn, - known_dbs + subpool_id :: binary() | any, + chain_logging :: main | aux | both, + log_type :: round | interval | both, + shares_db :: term(), + cachetbl :: ets:tid(), + cachetbln :: ets:tid(), + next_update :: erlang:timestamp() }). +-record(worker_last, { + ip :: string(), + user_agent :: string(), + round :: integer() | undefined +}). + +-record(subpool_last, { + block_num :: integer() | undefined, + prev_block :: binary() | undefined, + target :: binary() | undefined, + round :: integer() | undefined +}). + +-record(entry, { + id :: {binary(), main|aux} | {binary(), binary(), main|aux}, + valids = 0 :: integer(), + invalids = 0 :: integer(), + other_reasons = [] :: [{reject_reason(), integer()}], + last :: #worker_last{} | #subpool_last{}, + timestamp :: erlang:timestamp() +}). + +% This value cannot be changed at runtime, since it would mess up previous statistics +-define(LOG_INTERVAL, 10). + %% =================================================================== %% API functions %% =================================================================== @@ -49,9 +78,35 @@ log_share(LoggerId, Share) -> %% =================================================================== init(Config) -> - CandidatesOnly = proplists:get_value(candidate_shares_only, Config, false), + % Trap exit + process_flag(trap_exit, true), + % Load settings S = ecoinpool_db:get_couchdb_connection(), - {ok, #state{candidates_only=CandidatesOnly, conn=S, known_dbs=sets:new()}}. + DatabaseName = binary_to_list(proplists:get_value(database, Config, <<"shares">>)), + SubpoolId = proplists:get_value(subpool_id, Config, any), + ChainLogging = case proplists:get_value(chain_logging, Config, <<"main">>) of + <<"aux">> -> aux; + <<"both">> -> both; + _ -> main + end, + LogType = case proplists:get_value(log_type, Config, <<"both">>) of + <<"round">> -> round; + <<"interval">> -> interval; + _ -> both + end, + % Create in-memory cache + CacheTbl = ets:new(cachetbl, [set, protected, {keypos, #entry.id}]), + CacheTblN = ets:new(cachetbln, [set, protected, {keypos, #entry.id}]), + % Setup database + {ok, SharesDB} = setup_shares_db(S, DatabaseName), + % Setup interval logging + NextUpdate = if LogType =:= interval; LogType =:= both -> + % Start update timer + timer:send_interval(5000, check_update), % Set to 5 seconds to allow some randomization and avoid possible update conflicts + % Find next update time + get_next_update_ts(); + true -> undefined end, + {ok, #state{subpool_id=SubpoolId, chain_logging=ChainLogging, log_type=LogType, shares_db=SharesDB, cachetbl=CacheTbl, cachetbln=CacheTblN, next_update=NextUpdate}}. handle_call(_, _From, State) -> {reply, error, State}. @@ -60,7 +115,7 @@ handle_cast(#share{ timestamp=Timestamp, server_id=local, - subpool_name=SubpoolName, + subpool_id=SubpoolId, worker_id=WorkerId, user_id=UserId, ip=IP, @@ -82,53 +137,42 @@ handle_cast(#share{ aux_block_num=AuxBlockNum, aux_prev_block=AuxPrevBlock, aux_round=AuxRound}, + SState=#state{ - candidates_only=CandidatesOnly, - conn=S, - known_dbs=KnownDBs}) -> + subpool_id=FilterSubpoolId, + chain_logging=ChainLogging, + log_type=LogType, + shares_db=SharesDB, + cachetbl=CacheTbl, + cachetbln=CacheTblN, + next_update=NextUpdate + }) when + (SubpoolId =:= FilterSubpoolId) or (FilterSubpoolId =:= any) -> - KnownDBs1 = case sets:is_element(SubpoolName, KnownDBs) of - true -> KnownDBs; - false -> setup_shares_db(S, SubpoolName), sets:add_element(SubpoolName, KnownDBs) - end, - {ok, DB} = couchbeam:open_db(S, SubpoolName), - if - not CandidatesOnly; State =:= candidate -> - case State of - invalid -> - store_invalid_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, RejectReason, Hash, undefined, Target, BlockNum, PrevBlock, Round, DB); - _ -> - store_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, State, Hash, undefined, Target, BlockNum, PrevBlock, Data, Round, DB) - end; - true -> - ok + Tbl = case timer:now_diff(Timestamp, NextUpdate) of + D when D < 0 -> CacheTbl; + _ -> CacheTblN end, - case AuxpoolName of - undefined -> - {noreply, SState#state{known_dbs=KnownDBs1}}; - _ -> - KnownDBs2 = case sets:is_element(AuxpoolName, KnownDBs) of - true -> KnownDBs1; - false -> setup_shares_db(S, AuxpoolName), sets:add_element(SubpoolName, KnownDBs1) - end, - {ok, AuxDB} = couchbeam:open_db(S, AuxpoolName), - if - not CandidatesOnly; AuxState =:= candidate -> - case AuxState of - invalid -> - store_invalid_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, RejectReason, AuxHash, Hash, AuxTarget, AuxBlockNum, AuxPrevBlock, AuxRound, AuxDB); - _ -> - store_share_in_db(Timestamp, WorkerId, UserId, IP, UserAgent, AuxState, AuxHash, Hash, AuxTarget, AuxBlockNum, AuxPrevBlock, Data, AuxRound, AuxDB) - end; - true -> - ok - end, - {noreply, SState#state{known_dbs=KnownDBs2}} - end; - -handle_cast(#share{}, State) -> % Ignore other shares (currently these are remote shares) + + % Handle for main chain + if ChainLogging =:= main; ChainLogging =:= both -> + update_subpool(Tbl, SubpoolId, main, Timestamp, State, RejectReason, BlockNum, PrevBlock, Target, Round), + update_worker(Tbl, SubpoolId, WorkerId, main, Timestamp, State, RejectReason, IP, UserAgent, Round); + true -> ok end, + % Handle for aux chain + if (ChainLogging =:= aux) or (ChainLogging =:= both), AuxState =/= undefined -> + AuxRejectReason = if AuxState =:= invalid, RejectReason =:= undefined -> stale; true -> RejectReason end, + update_subpool(Tbl, SubpoolId, aux, Timestamp, AuxState, AuxRejectReason, AuxBlockNum, AuxPrevBlock, AuxTarget, AuxRound), + update_worker(Tbl, SubpoolId, WorkerId, aux, Timestamp, AuxState, AuxRejectReason, IP, UserAgent, AuxRound); + true -> ok end, + {noreply, State}; + +handle_cast(#share{}, State) -> % Ignore other shares {noreply, State}. +handle_info(check_update, State) -> + + handle_info(_, State) -> {noreply, State}. @@ -142,20 +186,107 @@ code_change(_OldVsn, State, _Extra) -> %% Other functions %% =================================================================== -setup_shares_db(S, SubpoolName) -> - case couchbeam:open_or_create_db(S, SubpoolName) of +setup_shares_db(S, DatabaseName) -> + case couchbeam:open_or_create_db(S, DatabaseName) of {ok, DB} -> lists:foreach(fun ecoinpool_db:check_design_doc/1, [ {DB, "stats", "shares_db_stats.json"}, {DB, "timed_stats", "shares_db_timed_stats.json"}, {DB, "auth", "shares_db_auth.json"} ]), - ok; + {ok, DB}; {error, Error} -> log4erl:error(db, "shares_db - couchbeam:open_or_create_db/3 returned an error:~n~p", [Error]), error end. +get_next_update_ts() -> + {Date, Time} = calendar:now_to_datetime(erlang:now()), + Secs = calendar:time_to_seconds(Time), + NextDateTime = case ecoinpool_util:ceiling(Secs / (60 * ?LOG_INTERVAL)) * (60 * ?LOG_INTERVAL) of + 86400 -> + Days = calendar:date_to_gregorian_days(Date), + {calendar:gregorian_days_to_date(Days + 1), {0,0,0}}; + NextSecs -> + {Date, calendar:seconds_to_time(NextSecs)} + end, + calendar:datetime_to_now(NextDateTime). + +update_subpool(Tbl, SubpoolId, Chain, Timestamp, State, RejectReason, BlockNum, PrevBlock, Target, Round) -> + Entry = case ets:lookup(Tbl, {SubpoolId, Chain}) of + [] -> #entry{id={SubpoolId, Chain}}; + [E] -> E + end, + SubpoolLast = update_subpool_last(Entry#entry.last, BlockNum, PrevBlock, Target, Round), + ets:insert(Tbl, update_entry(Entry, Timestamp, State, RejectReason, SubpoolLast)). + +update_worker(Tbl, SubpoolId, WorkerId, Chain, Timestamp, State, RejectReason, IP, UserAgent, Round) -> + Entry = case ets:lookup(Tbl, {SubpoolId, WorkerId, Chain}) of + [] -> #entry{id={SubpoolId, WorkerId, Chain}}; + [E] -> E + end, + WorkerLast = update_worker_last(Entry#entry.last, IP, UserAgent, Round), + ets:insert(Tbl, update_entry(Entry, Timestamp, State, RejectReason, WorkerLast)). + +update_entry(Entry=#entry{valids=Valids, invalids=Invalids}, Timestamp, State, RejectReason, Last) -> + case State of + invalid -> + case RejectReason of + stale -> + Entry#entry{ + invalids=Invalids + 1, + last=Last, + timestamp=Timestamp + }; + _ -> + OtherReasons = Entry#entry.other_reasons, + OtherReasonCount = case lists:keyfind(RejectReason, 1, OtherReasons) of + false -> 0; + {_, C} -> C + end, + Entry#entry{ + invalids=Invalids + 1, + other_reasons=lists:keystore(RejectReason, 1, OtherReasons, {RejectReason, OtherReasonCount + 1}), + last=Last, + timestamp=Timestamp + } + end; + _ -> % valid | candidate + Entry#entry{ + valids=Valids + 1, + last=Last, + timestamp=Timestamp + } + end. + +update_subpool_last(undefined, BlockNum, PrevBlock, Target, Round) -> + #subpool_last{ + block_num=BlockNum, + prev_block=PrevBlock, + target=Target, + round=Round + }; +update_subpool_last(#subpool_last{block_num=LastBlockNum, prev_block=LastPrevBlock, target=LastTarget, round=LastRound}, BlockNum, PrevBlock, Target, Round) -> + #subpool_last{ + block_num=case BlockNum of undefined -> LastBlockNum; _ -> BlockNum end, + prev_block=case PrevBlock of undefined -> LastPrevBlock; _ -> PrevBlock end, + target=case Target of undefined -> LastTarget; _ -> Target end, + round=case Round of undefined -> LastRound; _ -> Round end + }. + +update_worker_last(undefined, IP, UserAgent, Round) -> + #worker_last{ + ip=IP, + user_agent=UserAgent, + round=Round + }; +update_worker_last(#worker_last{ip=LastIP, user_agent=LastUserAgent, round=LastRound}, IP, UserAgent, Round) -> + #worker_last{ + ip=case IP of undefined -> LastIP; _ -> IP end, + user_agent=case UserAgent of undefined -> LastUserAgent; _ -> UserAgent end, + round=case Round of undefined -> LastRound; _ -> Round end + }. + -spec make_share_document(Timestamp :: erlang:timestamp(), WorkerId :: binary(), UserId :: term(), IP :: string(), UserAgent :: string(), State :: valid | candidate, Hash :: binary(), ParentHash :: binary() | undefined, Target :: binary(), BlockNum :: integer(), PrevBlock :: binary(), BData :: binary(), Round :: integer()) -> {[]}. make_share_document(Timestamp, WorkerId, UserId, IP, UserAgent, State, Hash, ParentHash, Target, BlockNum, PrevBlock, BData, Round) -> {{YR,MH,DY}, {HR,ME,SD}} = calendar:now_to_datetime(Timestamp), diff --git a/apps/ecoinpool/src/ecoinpool_rpc.erl b/apps/ecoinpool/src/ecoinpool_rpc.erl index 6490577..92d6d76 100644 --- a/apps/ecoinpool/src/ecoinpool_rpc.erl +++ b/apps/ecoinpool/src/ecoinpool_rpc.erl @@ -117,7 +117,7 @@ headers_from_options(Options) -> (longpolling, AccHeaders) -> [{"X-Long-Polling", "/LP"} | AccHeaders]; (rollntime, AccHeaders) -> - [{"X-Roll-NTime", "expire=10"} | AccHeaders]; + [{"X-Roll-NTime", "expire=120"} | AccHeaders]; ({reject_reason, Reason}, AccHeaders) -> [{"X-Reject-Reason", Reason} | AccHeaders]; ({block_num, BlockNum}, AccHeaders) -> diff --git a/apps/ecoinpool/src/ecoinpool_util.erl b/apps/ecoinpool/src/ecoinpool_util.erl index 50db269..c01986e 100644 --- a/apps/ecoinpool/src/ecoinpool_util.erl +++ b/apps/ecoinpool/src/ecoinpool_util.erl @@ -48,7 +48,9 @@ make_json_password/3, daemon_storage_dir/2, - server_storage_dir/1 + server_storage_dir/1, + + ceiling/1 ]). -on_load(module_init/0). @@ -244,3 +246,11 @@ server_storage_dir(SubpoolId) -> _ -> file:make_dir(Dirname) end, Dirname. + +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. diff --git a/apps/ecoinpool/src/mysql_sharelogger.erl b/apps/ecoinpool/src/mysql_sharelogger.erl index 265542b..2f84769 100644 --- a/apps/ecoinpool/src/mysql_sharelogger.erl +++ b/apps/ecoinpool/src/mysql_sharelogger.erl @@ -65,7 +65,7 @@ disconnect(Conn) -> end. fetch_result(Conn, Query) -> - case mysql_conn:fetch(Conn, iolist_to_binary(Query), self(), 10000) of + case mysql_conn:fetch(Conn, iolist_to_binary(Query), self(), 30000) of {data, MyFieldsResult} -> {ok, mysql:get_result_rows(MyFieldsResult)}; {updated, MyUpdateResult} -> {ok, mysql:get_result_affected_rows(MyUpdateResult)}; {error, #mysql_result{error=Reason}} -> {error, Reason}; diff --git a/apps/ecoinpool/src/remote_sharelogger.erl b/apps/ecoinpool/src/remote_sharelogger.erl new file mode 100644 index 0000000..49e2cf6 --- /dev/null +++ b/apps/ecoinpool/src/remote_sharelogger.erl @@ -0,0 +1,76 @@ + +%% +%% Copyright (C) 2011 Patrick "p2k" Schneider +%% +%% This file is part of ecoinpool. +%% +%% ecoinpool is free software: you can redistribute it and/or modify +%% it under the terms of the GNU General Public License as published by +%% the Free Software Foundation, either version 3 of the License, or +%% (at your option) any later version. +%% +%% ecoinpool is distributed in the hope that it will be useful, +%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%% GNU General Public License for more details. +%% +%% You should have received a copy of the GNU General Public License +%% along with ecoinpool. If not, see . +%% + +-module(remote_sharelogger). +-behaviour(gen_sharelogger). +-behaviour(gen_server). + +-include("gen_sharelogger_spec.hrl"). + +-export([start_link/2, log_share/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link(LoggerId, Config) -> + gen_server:start_link({local, LoggerId}, ?MODULE, Config, []). + +log_share(LoggerId, Share) -> + gen_server:cast(LoggerId, Share). + +%% =================================================================== +%% Gen_Server callbacks +%% =================================================================== + +init(Config) -> + RegName = binary_to_atom(proplists:get_value(registered_name, Config), utf8), + case proplists:get_value(node, Config) of + undefined -> ok; + Node -> pong = net_adm:ping(binary_to_atom(Node, utf8)) + end, + UseGenServer = proplists:get_value(use_gen_server, Config, true), + {ok, MyServerId} = application:get_env(ecoinpool, server_id), + {ok, {RegName, MyServerId, UseGenServer}}. + +handle_call(_, _From, State) -> + {reply, error, State}. + +handle_cast(Share=#share{server_id=local}, State={RegName, MyServerId, UseGenServer}) -> + if + UseGenServer -> + gen_server:cast({global, RegName}, Share#share{server_id=MyServerId}); + true -> + global:send(RegName, Share#share{server_id=MyServerId}) + end, + {noreply, State}; +handle_cast(_, State) -> + {noreply, State}. + +handle_info(_, State) -> + {noreply, State}. + +terminate(_, _) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/apps/ecoinpool/src/sql_sharelogger.erl b/apps/ecoinpool/src/sql_sharelogger.erl index c61fff7..32bc041 100644 --- a/apps/ecoinpool/src/sql_sharelogger.erl +++ b/apps/ecoinpool/src/sql_sharelogger.erl @@ -51,7 +51,7 @@ log_remote :: boolean(), subpool_id :: binary() | any, - log_type :: main | aux | both, + chain_logging :: main | aux | both, commit_interval :: integer(), always_log_data :: boolean(), conv_ts :: fun((erlang:timestamp()) -> calendar:datetime()), @@ -85,7 +85,7 @@ init({LoggerId, SQLModule, Config}) -> Table = proplists:get_value(table, Config, <<"shares">>), LogRemote = proplists:get_value(log_remote, Config, false), SubpoolId = proplists:get_value(subpool_id, Config, any), - LogType = case proplists:get_value(log_type, Config, <<"main">>) of + ChainLogging = case proplists:get_value(chain_logging, Config, <<"main">>) of <<"aux">> -> aux; <<"both">> -> both; _ -> main @@ -127,7 +127,7 @@ init({LoggerId, SQLModule, Config}) -> log_remote = LogRemote, subpool_id = SubpoolId, - log_type = LogType, + chain_logging = ChainLogging, commit_interval = case CommitTimer of undefined -> 0; _ -> CommitInterval end, always_log_data = AlwaysLogData, conv_ts = ConvTS, @@ -172,7 +172,7 @@ handle_cast(#share{ logger_id=LoggerId, log_remote=LogRemote, subpool_id=FilterSubpoolId, - log_type=LogType, + chain_logging=ChainLogging, always_log_data = AlwaysLogData, conv_ts=ConvTS, commit_timer=CommitTimer, @@ -183,8 +183,8 @@ handle_cast(#share{ Solution = if AlwaysLogData; - ShareState =:= candidate, LogType =/= aux; - AuxState =:= candidate, LogType =/= main -> + ShareState =:= candidate, ChainLogging =/= aux; + AuxState =:= candidate, ChainLogging =/= main -> conv_binary_data(Data); true -> undefined @@ -202,11 +202,11 @@ handle_cast(#share{ }, SQLShare1 = if - LogType =:= main; - LogType =:= both -> + ChainLogging =:= main; + ChainLogging =:= both -> BinBlockNum = conv_block_num(BlockNum), PrevBlockHash = conv_binary_data(PrevBlock), - case LogType of + case ChainLogging of both when AuxpoolName =/= undefined -> BinAuxBlockNum = conv_block_num(AuxBlockNum), SQLShare#sql_share{ @@ -290,7 +290,7 @@ handle_info(insert_sql_shares, State=#state{ CommitTimer end; _ -> - log4erl:warn("Could not send all shares at once due to size limit, continuing later (~p).", [LoggerId]), + log4erl:warn("Could not send all shares at once due to size limit, continuing later, ~b shares left (~p).", [length(RestSQLShares), LoggerId]), CommitTimer end, {noreply, State#state{query_size_limit=QuerySizeLimit, sql_conn=SQLConn, error_count=0, commit_timer=NewCommitTimer, sql_shares=RestSQLShares}}; diff --git a/apps/ecoinpool_mysql_replicator/src/mycouch_replicator.erl b/apps/ecoinpool_mysql_replicator/src/mycouch_replicator.erl index 28bbff3..9aa528e 100644 --- a/apps/ecoinpool_mysql_replicator/src/mycouch_replicator.erl +++ b/apps/ecoinpool_mysql_replicator/src/mycouch_replicator.erl @@ -2,7 +2,7 @@ %% %% mycouch_replicator - A CouchDB and MySQL replication engine %% -%% Copyright (C) 2011 Patrick "p2k" Schneider +%% Copyright (C) 2011-2012 Patrick "p2k" Schneider %% %% This program is free software: you can redistribute it and/or modify %% it under the terms of the GNU General Public License as published by @@ -40,18 +40,6 @@ % Internal state record -record(state, { - couch_db, - - my_pool_id, - my_table, - my_id_field, - my_timer, - - my_queries, - - couch_to_my, - my_to_couch, - cancel_echo }). @@ -72,6 +60,9 @@ % - CouchDB ID fields have to be strings, MySQL ID fields have to be integers. % - Both CouchToMy and MyToCouch must be functions which take one property list % as parameter and return one (possibly converted) property list as result. +% - Additionally CouchToMy and MyToCouch must accept the atom "new" and return +% a property list of default values (possibly for other fields which are not +% covered by the subsequent conversion call). % - The keys of the property lists coming from MySQL will be converted to lower % case strings prior to the call to MyToCouch; property lists for CouchDB, % on the other hand, have binaries as their keys in all cases. @@ -177,28 +168,34 @@ init([CouchDb, MyPoolId, MyTable, MyTriggerFields, MyInterval, CouchToMy, MyToCo DataQ = list_to_atom(lists:concat([MyTable, "_data_q"])), mysql:prepare(DataQ, iolist_to_binary(["SELECT * FROM `", MyTable, "` WHERE `", MyIdField, "` = ?;"])), DelQ = list_to_atom(lists:concat([MyTable, "_del_q"])), - mysql:prepare(DelQ, iolist_to_binary(["DELETE FROM `pool_worker` WHERE `", MyIdField, "` = ?;"])), + mysql:prepare(DelQ, iolist_to_binary(["DELETE FROM `", MyTable, "` WHERE `", MyIdField, "` = ?;"])), DelRevQ = list_to_atom(lists:concat([MyTable, "_del_rev_q"])), - mysql:prepare(DelRevQ, <<"DELETE FROM `pool_worker_rev` WHERE `my_id` = ?;">>), + mysql:prepare(DelRevQ, iolist_to_binary(["DELETE FROM `", MyTable, "_rev` WHERE `my_id` = ?;"])), MyQueries = #queries{changes=ChangesQ, rev=RevQ, couch_id=CouchIdQ, upd_rev=UpdRevQ, ins_rev=InsRevQ, del_rev=DelRevQ, data=DataQ, del=DelQ}, % Schedule changes polling timer {ok, MyTimer} = timer:send_interval(MyInterval * 1000, check_my_changes), + % Put immutable values into the process dictionary + % Conversion functions can access them too, like this + put(couch_db, CouchDb), + + put(my_pool_id, MyPoolId), + put(my_table, MyTable), + put(my_id_field, MyIdField), + put(my_timer, MyTimer), + + put(my_queries, MyQueries), + + put(couch_to_my, CouchToMy), + put(my_to_couch, MyToCouch), + {ok, #state{ - couch_db = CouchDb, - my_pool_id = MyPoolId, - my_table = MyTable, - my_id_field = MyIdField, - my_timer = MyTimer, - my_queries = MyQueries, - couch_to_my = CouchToMy, - my_to_couch = MyToCouch, cancel_echo = queue:new() }}. -handle_change({ChangeProps}, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId, my_table=MyTable, my_id_field=MyIdField, my_queries=MyQueries, couch_to_my=CouchToMy, cancel_echo=CancelEcho}) -> +handle_change({ChangeProps}, State=#state{cancel_echo=CancelEcho}) -> CouchId = proplists:get_value(<<"id">>, ChangeProps), case queue:peek(CancelEcho) of {value, CouchId} -> @@ -209,6 +206,8 @@ handle_change({ChangeProps}, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId, [{[{<<"rev">>, Rev}]}] = proplists:get_value(<<"changes">>, ChangeProps), Deleted = proplists:get_value(<<"deleted">>, ChangeProps, false), + MyPoolId = get(my_pool_id), + MyQueries = get(my_queries), {data, MyRevData} = mysql:execute(MyPoolId, MyQueries#queries.rev, [CouchId]), case mysql:get_result_rows(MyRevData) of [] -> @@ -217,7 +216,7 @@ handle_change({ChangeProps}, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId, ok; true -> % Missing -> insert ?Log("CouchId ~s: Inserting.", [CouchId]), - insert_into_mysql(CouchDb, CouchId, Rev, MyPoolId, MyTable, MyQueries, CouchToMy) + insert_into_mysql(get(couch_db), CouchId, Rev, MyPoolId, get(my_table), MyQueries, get(couch_to_my)) end; [{MyId, MyRev, MyDeleted}] -> if @@ -232,14 +231,15 @@ handle_change({ChangeProps}, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId, ok; true -> % Not on same revision (and maybe conflicting) -> take precedence and update ?Log("CouchId ~s: MyId ~b: Updating.", [CouchId, MyId]), - update_mysql(CouchDb, CouchId, Rev, MyPoolId, MyTable, MyIdField, MyId, MyQueries, CouchToMy) + update_mysql(get(couch_db), CouchId, Rev, MyPoolId, get(my_table), get(my_id_field), MyId, MyQueries, get(couch_to_my)) end end, {noreply, State} end. -handle_my_change(MyPoolId, CouchDb, MyId, CouchId, MyRev, Deleted, MyQueries, MyToCouch) -> +handle_my_change(MyPoolId, MyId, CouchId, MyRev, Deleted, MyQueries) -> + CouchDb = get(couch_db), case couchbeam:lookup_doc_rev(CouchDb, CouchId) of {error, _} -> if @@ -249,7 +249,7 @@ handle_my_change(MyPoolId, CouchDb, MyId, CouchId, MyRev, Deleted, MyQueries, My false; true -> % Missing -> insert ?Log("MyId ~b: Inserting.", [MyId]), - insert_into_couchdb(MyPoolId, MyId, CouchDb, MyQueries, MyToCouch), + insert_into_couchdb(MyPoolId, MyId, CouchDb, MyQueries, get(my_to_couch)), true end; Rev -> @@ -263,7 +263,7 @@ handle_my_change(MyPoolId, CouchDb, MyId, CouchId, MyRev, Deleted, MyQueries, My Rev =:= MyRev -> ?Log("MyId ~b: CouchId ~s: Updating.", [MyId, CouchId]); true -> ?Log("MyId ~b: CouchId ~s: Overriding conflict.", [MyId, CouchId]) end, - update_couchdb(MyPoolId, MyId, CouchDb, CouchId, Rev, MyQueries, MyToCouch) + update_couchdb(MyPoolId, MyId, CouchDb, CouchId, Rev, MyQueries, get(my_to_couch)) end end. @@ -273,8 +273,10 @@ handle_call(_Message, _From, State) -> handle_cast(_Message, State) -> {noreply, State}. -handle_info(check_my_changes, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId, my_queries=MyQueries, my_to_couch=MyToCouch, cancel_echo=CancelEcho}) -> +handle_info(check_my_changes, State=#state{cancel_echo=CancelEcho}) -> % Query for changes + MyPoolId = get(my_pool_id), + MyQueries = get(my_queries), {data, MyChangesData} = mysql:execute(MyPoolId, MyQueries#queries.changes, []), case mysql:get_result_rows(MyChangesData) of [] -> % No changes @@ -282,7 +284,7 @@ handle_info(check_my_changes, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId MyChanges -> {noreply, State#state{cancel_echo = lists:foldl( fun ({MyId, CouchId, MyRev, Deleted}, CancelEchoAcc) -> - case handle_my_change(MyPoolId, CouchDb, MyId, CouchId, MyRev, Deleted =:= 1, MyQueries, MyToCouch) of + case handle_my_change(MyPoolId, MyId, CouchId, MyRev, Deleted =:= 1, MyQueries) of true -> queue:in(CouchId, CancelEchoAcc); false -> CancelEchoAcc end @@ -295,10 +297,10 @@ handle_info(check_my_changes, State=#state{couch_db=CouchDb, my_pool_id=MyPoolId handle_info(_Message, State) -> {noreply, State}. -terminate(_Reason, #state{my_timer=MyTimer, my_queries=MyQueries}) -> - [_|Names] = tuple_to_list(MyQueries), +terminate(_Reason, #state{}) -> + [_|Names] = tuple_to_list(get(my_queries)), lists:foreach(fun mysql:unprepare/1, Names), - timer:cancel(MyTimer), + timer:cancel(get(my_timer)), ok. % ----- @@ -306,8 +308,12 @@ terminate(_Reason, #state{my_timer=MyTimer, my_queries=MyQueries}) -> insert_into_mysql(CouchDb, CouchId, Rev, MyPoolId, MyTable, #queries{ins_rev=InsRevQ}, CouchToMy) -> % Retrieve CouchDB document {ok, {DocProps}} = couchbeam:open_doc(CouchDb, CouchId), + % Create the default data set + MyDefaults = lists:keysort(1, CouchToMy(new)), % Convert - MyProps = CouchToMy(DocProps), + MyConverted = lists:keysort(1, CouchToMy(DocProps)), + % Merge + MyProps = filter_undefined(lists:ukeymerge(1, MyConverted, MyDefaults)), % Create INSERT statement {MyKeys, MyValues} = lists:unzip([{[$`, Key ,$`], mysql:encode(Value)} || {Key, Value} <- MyProps]), % Insert data @@ -350,8 +356,12 @@ insert_into_couchdb(MyPoolId, MyId, CouchDb, #queries{couch_id=CouchIdQ, upd_rev [{CouchId}] = mysql:get_result_rows(CouchIdResult), % Retrieve MySQL row MyProps = get_mysql_row_props(MyPoolId, MyId, DataQ), + % Create the default data set + DocDefaults = lists:keysort(1, MyToCouch(new)), % Convert - DocProps = filter_undefined(MyToCouch(MyProps)), + DocConverted = lists:keysort(1, MyToCouch(MyProps)), + % Merge + DocProps = filter_undefined(lists:ukeymerge(1, DocConverted, DocDefaults)), % Save new document and get rev {ok, {DocPropsSaved}} = couchbeam:save_doc(CouchDb, {[{<<"_id">>, CouchId} | DocProps]}), Rev = proplists:get_value(<<"_rev">>, DocPropsSaved), diff --git a/extras/send_rpc.sh b/extras/send_rpc.sh index 89e412f..5c6a4dd 100755 --- a/extras/send_rpc.sh +++ b/extras/send_rpc.sh @@ -12,8 +12,10 @@ AUTH=user:pass if [ "$2" == "" ];then curl http://$AUTH@$HOST:$PORT -X POST -d "{\"method\":\"$1\",\"id\":1}" -H 'Content-Type: application/json' -else +elif [ "$3" == "" ];then curl http://$AUTH@$HOST:$PORT -X POST -d "{\"method\":\"$1\",\"params\":[\"$2\"],\"id\":1}" -H 'Content-Type: application/json' +else + curl http://$AUTH@$HOST:$PORT -X POST -d "{\"method\":\"$1\",\"params\":[\"$2\",\"$3\"],\"id\":1}" -H 'Content-Type: application/json' fi echo "" diff --git a/site/README.md b/site/README.md index ccc86d9..24964bf 100644 --- a/site/README.md +++ b/site/README.md @@ -11,7 +11,8 @@ In order to build the site design documents, you need a copy of or symlink the compiler.jar into this folder. Secondly, get [Sass](http://sass-lang.com/), a cascading style sheet compiler. -The `sass` executable has to be in your `PATH`. +The `sass` executable has to be in your `PATH`. You also need +[compass](http://compass-style.org/), an enhancement library for sass. You should also have `escript` in your `PATH` which usually comes with your Erlang installation. diff --git a/test_launch.config.example b/test_launch.config.example index 0e4bc22..d7affb4 100644 --- a/test_launch.config.example +++ b/test_launch.config.example @@ -7,9 +7,10 @@ [ % SASL is Erlang's internal error and crash logger; it also logs starting % and stopping of certain processes. I set it to "error" here so it won't - % pollute stdout/stderr. + % pollute the logfile with that information. {sasl, [ - {errlog_type, error} + {errlog_type, error}, + {sasl_error_logger, {file, "log/error.log"}} ]}, % This is ecoinpool's main configuration. The CouchDB connection is From 1568885ff5db3ea3a511827d9e9e7ffa984c0b97 Mon Sep 17 00:00:00 2001 From: p2k Date: Wed, 2 Jan 2013 09:32:32 +0100 Subject: [PATCH 2/3] Fixes and improvements * Fixed Bitcoin RPC call * Fixed dependency issue * Extended timeouts * Added debug messages to SQL share logger --- apps/ecoinpool/src/btc_coindaemon.erl | 2 +- apps/ecoinpool/src/btc_daemon_util.erl | 2 +- apps/ecoinpool/src/ecoinpool_db.erl | 2 +- apps/ecoinpool/src/sql_sharelogger.erl | 5 +++++ rebar.config | 1 + 5 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/ecoinpool/src/btc_coindaemon.erl b/apps/ecoinpool/src/btc_coindaemon.erl index b79db33..a908697 100644 --- a/apps/ecoinpool/src/btc_coindaemon.erl +++ b/apps/ecoinpool/src/btc_coindaemon.erl @@ -104,7 +104,7 @@ post_workunit(PID) -> gen_server:cast(PID, post_workunit). send_result(PID, BData) -> - gen_server:call(PID, {send_result, BData}). + gen_server:call(PID, {send_result, BData}, 30000). get_first_tx_with_branches(PID, Workunit) -> gen_server:call(PID, {get_first_tx_with_branches, Workunit}). diff --git a/apps/ecoinpool/src/btc_daemon_util.erl b/apps/ecoinpool/src/btc_daemon_util.erl index b2205a6..89f4844 100644 --- a/apps/ecoinpool/src/btc_daemon_util.erl +++ b/apps/ecoinpool/src/btc_daemon_util.erl @@ -217,7 +217,7 @@ get_default_payout_address(URL, Auth) -> end. get_block_number(URL, Auth) -> - {ok, "200", _ResponseHeaders, ResponseBody} = ecoinpool_util:send_http_req(URL, Auth, "{\"method\":\"getblocknumber\"}"), + {ok, "200", _ResponseHeaders, ResponseBody} = ecoinpool_util:send_http_req(URL, Auth, "{\"method\":\"getblockcount\"}"), {Body} = ejson:decode(ResponseBody), proplists:get_value(<<"result">>, Body) + 1. diff --git a/apps/ecoinpool/src/ecoinpool_db.erl b/apps/ecoinpool/src/ecoinpool_db.erl index 75b64e9..8019e92 100644 --- a/apps/ecoinpool/src/ecoinpool_db.erl +++ b/apps/ecoinpool/src/ecoinpool_db.erl @@ -74,7 +74,7 @@ get_worker_record(WorkerId) -> -spec get_workers_for_subpools(SubpoolIds :: [binary()]) -> [worker()]. get_workers_for_subpools(SubpoolIds) -> - gen_server:call(?MODULE, {get_workers_for_subpools, SubpoolIds}). + gen_server:call(?MODULE, {get_workers_for_subpools, SubpoolIds}, 10000). -spec set_subpool_round(Subpool :: subpool(), Round :: integer()) -> ok. set_subpool_round(#subpool{id=SubpoolId}, Round) -> diff --git a/apps/ecoinpool/src/sql_sharelogger.erl b/apps/ecoinpool/src/sql_sharelogger.erl index 32bc041..ed213e8 100644 --- a/apps/ecoinpool/src/sql_sharelogger.erl +++ b/apps/ecoinpool/src/sql_sharelogger.erl @@ -93,10 +93,13 @@ init({LoggerId, SQLModule, Config}) -> CommitInterval = proplists:get_value(commit_interval, Config, 15), AlwaysLogData = proplists:get_value(always_log_data, Config, false), % Start SQL connection + log4erl:info("Connecting to SQL database (~p)...", [LoggerId]), {ok, SQLConn} = SQLModule:connect(LoggerId, Host, Port, User, Pass, Database), + log4erl:info("SQL connection established (~p).", [LoggerId]), % Check available columns FieldInfo = lists:zip(record_info(fields, sql_share), lists:seq(2, record_info(size, sql_share))), AvailableFieldNames = SQLModule:get_field_names(SQLConn, Table), + log4erl:info("~b SQL column(s) available (~p).", [length(AvailableFieldNames), LoggerId]), {InsertFieldNames, InsertFieldIds} = lists:foldr( fun (N, {IFN, IFI}) -> case proplists:get_value(binary_to_atom(N, utf8), FieldInfo) of @@ -112,6 +115,7 @@ init({LoggerId, SQLModule, Config}) -> ConvTS = make_timestamp_converter(SQLModule:get_timediff(SQLConn)), % Get the query size limit QuerySizeLimit = SQLModule:get_query_size_limit(SQLConn), + log4erl:info("SQL query size limit: ~b (~p).", [QuerySizeLimit, LoggerId]), % Setup commit timer CommitTimer = if not is_integer(CommitInterval); CommitInterval =< 0 -> @@ -120,6 +124,7 @@ init({LoggerId, SQLModule, Config}) -> {ok, T} = timer:send_interval(CommitInterval * 1000, insert_sql_shares), T end, + log4erl:info("SQL share logger initialized, using ~b columns (~p).", [length(InsertFieldNames), LoggerId]), {ok, #state{ logger_id = LoggerId, sql_config = {Host, Port, User, Pass, Database}, diff --git a/rebar.config b/rebar.config index 0776fcd..c0620e3 100644 --- a/rebar.config +++ b/rebar.config @@ -25,6 +25,7 @@ {deps, [ {protobuffs, ".*", {git, "git://github.com/basho/erlang_protobuffs.git", "master"}}, {couchbeam, ".*", {git, "git://github.com/benoitc/couchbeam.git", "master"}}, + {ejson, ".*", {git,"http://github.com/benoitc/ejson.git", "master"}}, {log4erl, ".*", {git, "git://github.com/SemanticSugar/log4erl.git", "master"}}, {mysql, ".*", {git, "git://github.com/elbrujohalcon/erlang-mysql-driver.git", "master"}}, {epgsql, ".*", {git, "git://github.com/wg/epgsql.git", "master"}} From a9a0b4a55b12e73317388e27bb1bb8bd8f3c56ef Mon Sep 17 00:00:00 2001 From: rjquery Date: Wed, 13 Mar 2013 09:49:31 -0300 Subject: [PATCH 3/3] Update couchdb_sharelogger.erl --- apps/ecoinpool/src/couchdb_sharelogger.erl | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/apps/ecoinpool/src/couchdb_sharelogger.erl b/apps/ecoinpool/src/couchdb_sharelogger.erl index 2c49235..6cf5516 100644 --- a/apps/ecoinpool/src/couchdb_sharelogger.erl +++ b/apps/ecoinpool/src/couchdb_sharelogger.erl @@ -1,4 +1,3 @@ - %% %% Copyright (C) 2011 Patrick "p2k" Schneider %% @@ -63,6 +62,8 @@ % This value cannot be changed at runtime, since it would mess up previous statistics -define(LOG_INTERVAL, 10). +-define(GREGORIAN_SECONDS_1970, 62167219200). + %% =================================================================== %% API functions %% =================================================================== @@ -73,6 +74,12 @@ start_link(LoggerId, Config) -> log_share(LoggerId, Share) -> gen_server:cast(LoggerId, Share). +%this function was called but it doesn't exist. +datetime_to_now(DateTime) -> + GSeconds = calendar:datetime_to_gregorian_seconds(DateTime), + ESeconds = GSeconds - ?GREGORIAN_SECONDS_1970, + {ESeconds div 1000000, ESeconds rem 1000000, 0}. + %% =================================================================== %% Gen_Server callbacks %% =================================================================== @@ -171,9 +178,7 @@ handle_cast(#share{}, State) -> % Ignore other shares {noreply, State}. handle_info(check_update, State) -> - - -handle_info(_, State) -> +%handle_info(_, State) -> was defined twice, I'm pointing out, not sure what to do with it. {noreply, State}. terminate(_, _) -> @@ -210,7 +215,7 @@ get_next_update_ts() -> NextSecs -> {Date, calendar:seconds_to_time(NextSecs)} end, - calendar:datetime_to_now(NextDateTime). + datetime_to_now(NextDateTime). update_subpool(Tbl, SubpoolId, Chain, Timestamp, State, RejectReason, BlockNum, PrevBlock, Target, Round) -> Entry = case ets:lookup(Tbl, {SubpoolId, Chain}) of