

Meu trabalho de conclusão de curso foi sobre um sistema de comunicação de bibliotecas. Desenvolvi o trabalho inicialmente nas aulas de XP e mais tarde, junto com o co-autor da monografia, Leandro Bororo.
Todo o trabalho gira em torno de dois problemas: exportar os dados da base para um formato especial, denominado MARC, e importar os dados desse formato para o banco. Podemos separar isso na discussão legal e na discussão chata. A discussão chata é justamente os objetivos básicos do projeto supracitados. A discussão legal é como criar uma interface WEB para esse componente que permita o acompanhamento do processo de importação ou exportação. Obviamente vou falar sobre o segundo caso.
Desenvolver uma interface WEB para o MARC não é nenhum problema. No entanto, acompanhar a importação/exportação e permitir que o sistema trabalhe com arquivos grandes (mais de 10 megabytes) é outra história. A própria natureza de uma aplicação WEB torna a solução síncrona problemática. A análise de um arquivo MARC pode demorar bastante. O suficiente para a conexão HTTP (cliente-servidor) finalizar por timeout. Além disso, não seria a implementação mais responsiva (se todas as informações são direcionadas para a saída padrão, essas serão exibidas com sorte na linha de comando ou em algum arquivo de registro do servidor, em todo caso, não visível para o cliente). Para resolver isso, o tratamento da informação deveria ser assíncrono.
Inicialmente, poderíamos pensar em enviar as mensagens diretamente para o browser. Mas isso não é possível fazer diretamente. De fato, é o navegador faz as solicitações (request) de um certo recurso (disponível numa localização específica - URL). Uma alternativa seria o uso de Ajax (Asynchronous Javascript And XML) que daria a sensação de que o browser é capaz receber notificações do servidor. Na verdade, no Ajax, é o navegador que faz chamadas remotas de tempos em tempos (sem que o usuário tome alguma ação específica). O fato de a primeira requisição bloquear.
Abaixo, segue o funcionamento da comunicação síncrona entre Browser e servidor que mostra porque o browser irá bloquear ao enviar o arquivo para processamento:
public Response processar (Request request) {
// magia negra para pegar arquivo do request
File file = pegaArquivo(request);
// magia negra para processar o arquivo
Resposta resposta = processa(arquivo);
// magia para transformar resposta em response
return transforma(resposta);
}
A conversa do navegador com o servidor é feito por ciclos de requisições e respostas. Se for necessário que numa requisição o servidor processe um dado arquivo, é preciso esperar todo o processamento para se obter a resposta do servidor. Pode-se separar o processamento da submissão do arquivo, delegando a responsabilidade para outro processo. Exemplo:
public Response processar (Request request) {
// magia negra para pegar arquivo do request
File file = pegaArquivo(request);
// apenas grava o arquivo.. uma outra linha de execução processa o arquivo.
grava(arquivo);
return paginaSucesso();
}
Se o processamento do arquivo é feito na mesma linha de execução da submissão, o cliente só terá uma resposta após o processamento do arquivo.
Para o servidor processar o arquivo de forma assíncrona é necessário que primeiro um "Servlet" processe e grava o arquivo num repositório temporário (seja ele uma fila JMS, um caixa de email, um diretório, etc) e um outro processo, de tempos em tempos, verifique o conteúdo desse repositório.
Esse outro processo pode atualizar um arquivo de log a medida que for necessário (por exmplo: a medida que os registros Marcs forem processados) . Dessa forma, o cliente poderá "requisitar" o conteúdo do arquivo quantas vezes quiser.
// isso é fácil...
public Resposta processar(Request request) {
descobreIdArquivo(request);
// conteúdo atual do arquivo
String arquivo = leArquivo(idArquivo);
return converteParaResponse(arquivo);
}
A síntese da solução é a seguinte: Num primeiro momento o usuário faria o "upload" do arquivo. Depois o usuário deveria listar o arquivo em importação e consultar os detalhes da importação. O mesmo pode ser feito para a exportação. Uma fila de mensagens assíncronas e um processo em execução paralela devem ser implementados.
Implementamos o processo da seguinte maneira:
O servidor recebe um arquivo, via FormFile (essa classe é um utilitário da biblioteca commons-upload [do projeto Commons da Apache Software Foundation] para tratar multipart/form-data de uma requisição HTTP) e o grava num diretório específico. Em seguida, envia um objeto comando como mensagem para uma fila (com o nome do arquivo já atribuído a uma de suas propriedades). Por outro lado, o sistema mantém uma reserva (pool) de trabalhadores que consomem os "trabalhos" enviados para a fila. Cada processamento é registrado por um registrador (logger) num mapa cuja chave é o nome do arquivo. Um Servlet controla o ciclo de vida desses trabalhadores. Os trabalhadores devem por isso, realizar pequenos blocos de operações e verificar se pode executar o próximo bloco, para evitar bloquear o servidor caso seja necessário reiniciá-lo. Abaixo, segue como foi implementado o trabalhador:
@Override
public void run() {
try {
while (condicao) {
Command command = works.take();
if (NO_MORE_WORK == command) {
break;
}
try {
while (command.hasWork() && condicao) {
command.process();
}
} catch (Exception e) {
e.printStackTrace();
}
command.finish();
}
} catch (InterruptedException e) {
}
}
Nesse trecho de código, o objeto works representa a fila de trabalhos (BlockingQueue do pacote utilitário de concorrência do Java 1.5 e superiores). Ele será o repositório da comunicação Produtor-Consumidor. Command é a classe que implementa o padrão Command [Gof], o comando que será executado. A variável condicao permite que o trabalhador verifique se deve continuar a execução do trabalho atual. Aqui a implementação do comando para a Importação do MARC:
private class ImportMarcCommand extends Command {
private LenientMarcStreamReader reader;
private MarcManager marcManager;
private FileInputStream inputStream;
private String nomeArquivo;
private FileChannel channel;
@Override
public boolean hasWork() {
return reader.hasNext();
}
@Override
public void process() throws Exception {
marcManager.parse(reader.next());
marcManager.getLog().setAvailable(channel.position());
}
public ImportMarcCommand(String nomeDoArquivo, boolean sobrescreveItem,
boolean sobrescreveExemplar) throws NamingException,
IOException {
marcManager = new MarcManager(sobrescreveItem, sobrescreveExemplar);
Logger.getInstance().getLogs().add(marcManager.getLog());
this.nomeArquivo = nomeDoArquivo;
File file = new File(nomeDoArquivo);
inputStream = new FileInputStream(file);
channel = inputStream.getChannel();
marcManager.getLog().setInitialAvailable(file.length());
reader = new LenientMarcStreamReader(inputStream);
}
@Override
public void finish() {
marcManager.getLog().setFinalDate(new Date());
// vou fechar o stream por educação. se eu não conseguir, não vou
// ligar.
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// tenta apagar educadamente.
File f = new File(nomeArquivo);
if (!f.exists() && !f.canWrite()) {
f.delete();
}
}
}
Algumas observações:
O ImportMarcCommand processa um registro MARC por vez (então, o processo pode ser finalizado logo que o registro atual for processado).
O objeto reader é responsável por identificar o registro e mover o indicador da posição de leitura do arquivo.
O objeto channel permite que essa informação (sobre o indicador da posição seja computado) seja computada no cálculo do total decorrido.
O Logger é o repositório com as informações sobre o histórico de cada processamento (tempo inicial, final , etc).
No método finish, é registrado o fim do processamento e o arquivo é apagado.
public class ThreadMonitorServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
public void destroy() {
for (int i = 0; i < workers.length; i++) {
workers[i].setCondicao(false);
try {
queue.put(ImportWorker.NO_MORE_WORK);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
super.destroy();
}
static final int capacity = 10;
final int numWorkers = 3;
public static BlockingQueue queue = new ArrayBlockingQueue(
capacity);
ImportWorker[] workers = new ImportWorker[numWorkers];
@Override
public void init(ServletConfig servletConfig) throws ServletException {
super.init(servletConfig);
for (int i = 0; i < workers.length; i++) {
workers[i] = new ImportWorker(queue);
workers[i].start();
}
}
}
O método init recebe o evento de inicialização do servidor. Nesse momento, o Servlet "contrata" os trabalhadores que serão responsáveis pelo processamento dos comandos, isso é, o Servlet atribui a lista de trabalho e inicia as threads. No método destroy o Servlet recebe o evento para finalizar e então muda a condição de execução das threads de tal forma que elas parem sua execução.
Declarado no web.xml da seguinte maneira:
<servlet>
<servlet-name>monitor</servlet-name>
<servlet-class>
br.usp.ime.colmeia.async.ThreadMonitorServlet
</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
Depois disso, tem-se a ação para listar a situação do processamento de cada arquivo.
Comentários